Spark Operator 初體驗

2020-12-11 騰訊網

關於存算分離

目前企業級的大數據應用主流還是採用Yarn或者Mesos來進行資源分配和運行調度的,例如我行目前採用Yarn來進行作業調度,並使用HDFS作為大數據的存儲平臺,這是典型的計算和存儲緊耦合的模式,這種方案是通過數據本地化策略來減少數據的網絡傳輸,從而實現良好的計算性能。

隨著業務的發展,支持作業運行所需要的計算資源(CPU、內存、網絡帶寬)的需求量也會不斷增長,就可能出現Hadoop集群的計算資源不足的情況,在目前的架構下我們只能通過擴容集群伺服器的方式來解決,然而這種方式的步驟較為繁瑣,且無法實現計算資源的彈性伸縮,時效性和靈活性較差 。而Spark作業通過Kubernetes進行資源管理和調度的方案可以方便地實現計算資源的動態調整,從而快速適應業務場景的變化,並且還可以實現硬體資源的充分利用並節約成本。

存算分離:將計算模塊改為運行在K8S集群中來實現計算資源的快速調整;而存儲模塊由於狀態複雜,並且不需要進行快速的資源調整和變化,因此可以將計算模塊與存款模塊分離開來,即近期討論較多的存算分離的模式。

Spark Operator

Spark是目前在我行使用範圍非常廣泛的一種大數據計算引擎,本文將主要討論Spark on K8S的實現。

將Spark運行在K8S集群上可以採用Spark官方原生的作業運行方式(https://spark.apache.org/docs/3.0.0/running-on-kubernetes.html),在該模式下提交Spark作業仍然延用了spark-submit命令,並且通過指定K8S集群的ApiServer地址作為master來提交Spark作業,該方式目前對於Spark作業的管理功能較為簡單,並且缺乏統一的資源調度和管理能力。

我們也可以採用Spark Operator的方式,Spark Operator( https://github.com/GoogleCloudPlatform/spark-on-k8s-operator )是由谷歌發起和維護的開源項目,它將Spark和Kubernetes進行了深度的集成,是一個可以管理Spark應用程式生命周期的Kubernetes插件。開發人員通過編寫yaml文件即可在K8S集群上提交Spark作業,而不需要在客戶機上配置spark-submit工具。

Spark Operator還提供了強大的作業管理功能,例如使用sparkctl命令來執行創建、查看、停止作業來管理Spark作業的生命周期,還支持通過ingress的服務暴露模式來訪問作業的UI界面。本文中將主要介紹Spark Operator這種作業運行模式。

Operator是由CoreOS公司推出的, 通過定義CRD(CustomResourceDefinition)和實現相應的Controller來擴展Kubernetes 集群的功能。CRD是從Kubernetes 1.7 版本開始引入的概念,它可以註冊到 kubernetes 集群中,使得用戶可以像使用原生的集群資源(例如 pod、deployment)一樣對CRD對象進行創建、查看、刪除等操作;Controller則會監聽資源的狀態變化並進行處理,嘗試讓CRD 定義的資源達到預期的狀態。

接下來我們來看一下SparkOperator的相關實現原理。

Spark Operator架構

Spark Operator的主要組件如下:

1、SparkApplication Controller : 用於監控並相應SparkApplication的相關對象的創建、更新和刪除事件;

2、Submission Runner:用於接收Controller的提交指令,並通過spark-submit 來提交Spark作業到K8S集群並創建Driver Pod,driver正常運行之後將啟動Executor Pod;

3、Spark Pod Monitor:實時監控Spark作業相關Pod(Driver、Executor)的運行狀態,並將這些狀態信息同步到Controller ;

4、Mutating Admission Webhook:可選模塊,但是在Spark Operator中基本上所有的有關Spark pod在Kubernetes上的定製化功能都需要使用到該模塊,因此建議將enableWebhook這個選項設置為true。

5、sparkctl:基於Spark Operator的情況下可以不再使用kubectl來管理Spark作業了,而是採用Spark Operator專用的作業管理工具sparkctl,該工具功能較kubectl功能更為強大、方便易用。

其中,Controller是作為Spark Operator的核心組件,用於控制和處理pod以及應用運行的狀態變化。

如下代碼片段展示了Controller更新Driver和Executor Pod狀態變化的邏輯:

Spark 應用的狀態機圖示如下:

如下代碼片段展示了Controller更新application應用狀態的相關操作:

Spark Operator通過啟動一個監聽對象ResourceUsageWatcher來實時監聽和更新集群資源的使用情況:

當發生新增應用的調度請求、應用狀態更新,以及新增Pod或者Pod狀態更新等情況下,均可觸發ResourceUsageWatcher的相關操作。例如在onPodUpdated方法中,通過調用ResourceUsageWatcher的setResources來實時更新集群當前的可調度資源:

搭建運行環境

接下來我們通過一個實驗來測試一下Spark Operator的相關特性。

1.本實驗環境所採用的K8S集群為1.15.12 版本,Spark 採用3.0.0版本。

2.Hadoop集群採用的是CDH 5.13版本,Hadoop的版本為2.6.0。

3.下載並編譯Spark Operator :

在sparkctl目錄下執行編譯得到sparkctl工具,並將sparkctl 拷貝到/usr/bin目錄下,即可使用sparkctl命令行工具。

4.為Spark Operator插件創建namespace:

5.通過helm安裝Spark Operator :

6.創建Spark作業的ServiceAccount相關權限對象

在本實驗中ServiceAccount資源的創建使用的是官網上的示例yaml文件,其中ServiceAccount設置為spark,實際工作中我們也可以定義自己的ServiceAccount、Role和RoleBinding資源,並在作業提交時指定相應的對象名稱即可。

一個操作HDFS文件的wordcount實例

編寫示例代碼

本項目中的代碼是一個簡單的Spark wordcount程序,通過讀取存儲在HDFS上的文本文件,並利用Spark計算出在該文本中每個單詞的出現頻率。同時採用Kerberos的認證方式來實現對HDFS的安全訪問。

本示例中所採用的分布式文件系統是HDFS,後續我們也將考慮使用對象存儲平臺來做測試。

主要代碼如下:

生成docker鏡像

1.代碼編寫完成後,通過maven工具對本項目進行編譯和打包,得到該項目的jar包:WordCount1-1.0-SNAPSHOT.jar

2.然後編寫Dockerfile並生成docker鏡像:

Dockerfile文件內容如下:

生成docker鏡像:

將作業運行於K8S集群

打包好應用鏡像之後,編寫words.yaml文件,用於提交作業到K8S集群:

然後我們將在K8S集群中運行該示例程序:

提交作業

通過執行sparkctl create $.yaml 來提交作業:

查看Spark作業的運行狀態

作業提交後,通過執行 sparkctl list 命令可以查看所有已提交的Spark 作業,並可以查看作業當前的運行狀態:

查看應用日誌

通過執行sparkctl log $ 命令即可輸出該作業的詳細日誌信息:

後續我們考慮通過filebeat將pod的日誌信息收集到ELK日誌平臺,從而為用戶提供更為方便的日誌查詢渠道。

查看Spark作業的事件

通過執行sparkctl event $ 命令查看該Spark作業的event信息;

另外,在作業啟動失敗的情況下也可以通過該命令來分析失敗原因:

查看作業運行界面

每個Spark作業的UI服務都通過ingress的暴露方式對外發布,通過執行 kubectl get ingress 命令即可獲取相應作業的ingress地址:

得到ingress地址後,即可通過瀏覽器查看該作業的運行情況,下圖展示的就是wordcount的Spark原生運行界面:

注意:沒有DNS的同學可以在hosts中添加相應的域名映射即可訪問。

至此我們了解了Spark on K8S的相關原理,並從0開始搭建環境並實踐了一個的基於Spark Operator的應用。

目前Spark Operator這個項目還在不斷地更新迭代,我們將持續關注該項目的進展,並繼續在用戶授權、資源隔離、日誌接入和作業監控等方面進行進一步地測試和驗證,逐漸探索出適合我行的Spark on K8S方案。

作者簡介:

焦媛,2011年加入民生銀行,目前主要負責Hadoop平臺運維和相關工具研發,以及HDFS和Spark相關產品的技術支持工作。

相關焦點

  • Spark RDD上的map operators是如何pipeline起來的
    當時我很自然地回答說:不需要多次循環,spark會將多個map操作pipeline起來apply到rdd partition的每個data element上。事後仔細想了想這個問題,雖然我確信spark不可能傻到每個map operator都循環遍歷一次數據,但是這些map操作具體是怎麼被pipeline起來apply的呢?這個問題還真不太清楚。
  • 是時候放棄Spark Streaming,轉向Structured Streaming了
    這樣導致開發者的體驗非常不好,也是任何一個基礎框架不想看到的(基礎框架的口號一般都是:你們專注於自己的業務邏輯就好,其他的交給我)。這也是很多基礎系統強調 Declarative 的一個原因。另外,Structured Streaming 還提供了一些 Streaming 處理特有的 API:Trigger, watermark, stateful operator。Execution:復用 Spark SQL 的執行引擎。
  • Spark簡介
    object WordCount { def main(args: Array[String]): Unit = {   val appName = "WordCountApp"   val spark = SparkSession     .builder()     .appName(appName)     .master
  • Apache Spark 1.5.0 正式發布 - OSCHINA - 中文開源技術交流社區
    for RowMatrix[SPARK-7387] - CrossValidator example code in Python[SPARK-7422] - Add argmax to Vector, SparseVector[SPARK-7440] - Remove physical Distinct operator
  • Kubernetes API 編程利器:Operator 和 Operator Framework
    按照處理類型的不同,一般可以將其分為兩類:一類可能會修改傳入對象,稱為 mutating webhook;一類則會只讀傳入對象,稱為 validating webhook。工作隊列:controller 的核心組件。
  • 《無畏契約》Operator好用嗎 Operator介紹
    無畏契約operator好用嗎?operator是遊戲中一把非常強勢的槍,很多小夥伴可能對它還不是很了解吧,今天小編給大家帶來瓦羅蘭特Operator介紹,快來看一下吧。 無畏契約operator好用嗎?
  • Spark 2.0系列之SparkSession詳解
    的各項功能,用戶不但可以使用DataFrame和Dataset的各種API,學習Spark的難度也會大大降低。SparkSession的功能首先,我們從一個Spark應用案例入手:SparkSessionZipsExample可以從JSON文件中讀取郵政編碼,通過DataFrame API進行分析,同時還能夠使用Spark SQL語句實施查詢。
  • 問題排查 | Spark OrcFileFormat inferSchema執行巨慢問題分析
    :69) at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala:77) at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala
  • 「新思想」Spark課程理念在體育教學中的應用
    筆者通過對該課程初體驗的感受,提出spark課程對體育教學的借鑑及應用。1.在教育中我們講求「潤物無聲」,在spark教學中會利用注意力遷移達到運動無形,卸下學生的運動負擔。「快樂體育、健康體育」,讓學生體驗快樂,感受到運動的快樂,才能促進學生主動鍛鍊,從而達到身心健康。
  • 一、Spark概述
    2、易用性(可以通過java/scala/python/R開發spark應用程式)3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)4、兼容性(spark程序可以運行在standalone/yarn/mesos)Spark 框架模塊整個Spark 框架模塊包含
  • 『 Spark 』2. spark 基本概念解析
    過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來。寫這樣一個系列僅僅是為了梳理個人學習spark的筆記記錄,並非為了做什麼教程,所以一切以個人理解梳理為主,沒有必要的細節就不會記錄了。若想深入了解,最好閱讀參考文章和官方文檔。其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本好還是必要的。1.
  • 當MongoDB遇見Spark
    MongoDB的話, 那就不用再轉存一份到HDFS上了可以利用MongoDB強大的Aggregate做數據的篩選或預處理MongoDB Spark Connector介紹支持讀取和寫入,即可以將計算後的結果寫入MongoDB將查詢拆分為n個子任務, 如Connector會將一次match,拆分為多個子任務交給spark
  • Spark【面試】
     spark用戶提交的任務成為application,一個application對應一個sparkcontext,app中存在多個job,每觸發一次action操作就會產生一個job。 spark:spark的shuffle是在DAGSchedular劃分Stage的時候產生的,TaskSchedule要分發Stage到各個worker的executor。 減少shuffle可以提高性能。23、RDD機制?
  • 『 Spark 』13. Spark 2.0 Release Notes 中文版
    過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來。寫這樣一個系列僅僅是為了梳理個人學習spark的筆記記錄,所以一切以能夠理解為主,沒有必要的細節就不會記錄了,而且文中有時候會出現英文原版文檔,只要不影響理解,都不翻譯了。若想深入了解,最好閱讀參考文章和官方文檔。其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本號還是必要的。
  • 『 Spark 』9. 搭建 IPython + Notebook + Spark 開發環境
    過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來。寫這樣一個系列僅僅是為了梳理個人學習spark的筆記記錄,所以一切以能夠理解為主,沒有必要的細節就不會記錄了,而且文中有時候會出現英文原版文檔,只要不影響理解,都不翻譯了。若想深入了解,最好閱讀參考文章和官方文檔。其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本號還是必要的。
  • 使用Golang實現Spark UDF
    組合即將大功告成,但我們需要一個spark jar包用來做編譯。我寫了一個Makefile,它會下載spark jar包、存儲到./spark_jars目錄下。有了spark jar包後就可以編譯Java代碼了:javac -cp \ spark_jars/spark-core_$(VERSION).jar:eventdecoder.jar:spark_jars/spark-sql_$(VERSION).jar \ java/com/community/EventsUDF/Decoder.java
  • ​Spark Core基礎面試題總結(上)
    ://https://<k8s-apiserver-host>:<k8s-apiserver-port> \    --deploy-mode cluster \    --name spark-pi \    --class org.apache.spark.examples.SparkPi \    --conf spark.executor.instances
  • Spark SQL重點知識總結
    ,路徑在下面的代碼中case class Person(name:String,age:Int)val peopleDF=spark.sparkContext.textFile("file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt").map(_.split(",")).map(para=>Person(para
  • Spark-TFRecord: Spark將全面支持TFRecord
    Spark-TensorFlow-Connector 是 TensorFlow 生態圈的一部分,並且是由 DataBricks,spark 的創始公司提供。儘管 Spark-TensorFlow-Connector 提供基本的讀寫功能,但是我們在LinkedIn的使用中發現了兩個問題。首先,它基於RelationProvider接口。
  • spark結構化數據處理:Spark SQL、DataFrame和Dataset
    import org.apache.spark.sql.Rowimport org.apache.spark.sql.types.指定數據源通常需要使用數據源全名(如org.apache.spark.sql.parquet),但對於內建數據源,你也可以使用它們的短名(json、parquet和jdbc)。並且不同的數據源類型之間都可以相互轉換。