關於存算分離
目前企業級的大數據應用主流還是採用Yarn或者Mesos來進行資源分配和運行調度的,例如我行目前採用Yarn來進行作業調度,並使用HDFS作為大數據的存儲平臺,這是典型的計算和存儲緊耦合的模式,這種方案是通過數據本地化策略來減少數據的網絡傳輸,從而實現良好的計算性能。
隨著業務的發展,支持作業運行所需要的計算資源(CPU、內存、網絡帶寬)的需求量也會不斷增長,就可能出現Hadoop集群的計算資源不足的情況,在目前的架構下我們只能通過擴容集群伺服器的方式來解決,然而這種方式的步驟較為繁瑣,且無法實現計算資源的彈性伸縮,時效性和靈活性較差 。而Spark作業通過Kubernetes進行資源管理和調度的方案可以方便地實現計算資源的動態調整,從而快速適應業務場景的變化,並且還可以實現硬體資源的充分利用並節約成本。
存算分離:將計算模塊改為運行在K8S集群中來實現計算資源的快速調整;而存儲模塊由於狀態複雜,並且不需要進行快速的資源調整和變化,因此可以將計算模塊與存款模塊分離開來,即近期討論較多的存算分離的模式。
Spark Operator
Spark是目前在我行使用範圍非常廣泛的一種大數據計算引擎,本文將主要討論Spark on K8S的實現。
將Spark運行在K8S集群上可以採用Spark官方原生的作業運行方式(https://spark.apache.org/docs/3.0.0/running-on-kubernetes.html),在該模式下提交Spark作業仍然延用了spark-submit命令,並且通過指定K8S集群的ApiServer地址作為master來提交Spark作業,該方式目前對於Spark作業的管理功能較為簡單,並且缺乏統一的資源調度和管理能力。
我們也可以採用Spark Operator的方式,Spark Operator( https://github.com/GoogleCloudPlatform/spark-on-k8s-operator )是由谷歌發起和維護的開源項目,它將Spark和Kubernetes進行了深度的集成,是一個可以管理Spark應用程式生命周期的Kubernetes插件。開發人員通過編寫yaml文件即可在K8S集群上提交Spark作業,而不需要在客戶機上配置spark-submit工具。
Spark Operator還提供了強大的作業管理功能,例如使用sparkctl命令來執行創建、查看、停止作業來管理Spark作業的生命周期,還支持通過ingress的服務暴露模式來訪問作業的UI界面。本文中將主要介紹Spark Operator這種作業運行模式。
Operator是由CoreOS公司推出的, 通過定義CRD(CustomResourceDefinition)和實現相應的Controller來擴展Kubernetes 集群的功能。CRD是從Kubernetes 1.7 版本開始引入的概念,它可以註冊到 kubernetes 集群中,使得用戶可以像使用原生的集群資源(例如 pod、deployment)一樣對CRD對象進行創建、查看、刪除等操作;Controller則會監聽資源的狀態變化並進行處理,嘗試讓CRD 定義的資源達到預期的狀態。
接下來我們來看一下SparkOperator的相關實現原理。
Spark Operator架構
Spark Operator的主要組件如下:
1、SparkApplication Controller : 用於監控並相應SparkApplication的相關對象的創建、更新和刪除事件;
2、Submission Runner:用於接收Controller的提交指令,並通過spark-submit 來提交Spark作業到K8S集群並創建Driver Pod,driver正常運行之後將啟動Executor Pod;
3、Spark Pod Monitor:實時監控Spark作業相關Pod(Driver、Executor)的運行狀態,並將這些狀態信息同步到Controller ;
4、Mutating Admission Webhook:可選模塊,但是在Spark Operator中基本上所有的有關Spark pod在Kubernetes上的定製化功能都需要使用到該模塊,因此建議將enableWebhook這個選項設置為true。
5、sparkctl:基於Spark Operator的情況下可以不再使用kubectl來管理Spark作業了,而是採用Spark Operator專用的作業管理工具sparkctl,該工具功能較kubectl功能更為強大、方便易用。
其中,Controller是作為Spark Operator的核心組件,用於控制和處理pod以及應用運行的狀態變化。
如下代碼片段展示了Controller更新Driver和Executor Pod狀態變化的邏輯:
Spark 應用的狀態機圖示如下:
如下代碼片段展示了Controller更新application應用狀態的相關操作:
Spark Operator通過啟動一個監聽對象ResourceUsageWatcher來實時監聽和更新集群資源的使用情況:
當發生新增應用的調度請求、應用狀態更新,以及新增Pod或者Pod狀態更新等情況下,均可觸發ResourceUsageWatcher的相關操作。例如在onPodUpdated方法中,通過調用ResourceUsageWatcher的setResources來實時更新集群當前的可調度資源:
搭建運行環境
接下來我們通過一個實驗來測試一下Spark Operator的相關特性。
1.本實驗環境所採用的K8S集群為1.15.12 版本,Spark 採用3.0.0版本。
2.Hadoop集群採用的是CDH 5.13版本,Hadoop的版本為2.6.0。
3.下載並編譯Spark Operator :
在sparkctl目錄下執行編譯得到sparkctl工具,並將sparkctl 拷貝到/usr/bin目錄下,即可使用sparkctl命令行工具。
4.為Spark Operator插件創建namespace:
5.通過helm安裝Spark Operator :
6.創建Spark作業的ServiceAccount相關權限對象
在本實驗中ServiceAccount資源的創建使用的是官網上的示例yaml文件,其中ServiceAccount設置為spark,實際工作中我們也可以定義自己的ServiceAccount、Role和RoleBinding資源,並在作業提交時指定相應的對象名稱即可。
一個操作HDFS文件的wordcount實例
編寫示例代碼
本項目中的代碼是一個簡單的Spark wordcount程序,通過讀取存儲在HDFS上的文本文件,並利用Spark計算出在該文本中每個單詞的出現頻率。同時採用Kerberos的認證方式來實現對HDFS的安全訪問。
本示例中所採用的分布式文件系統是HDFS,後續我們也將考慮使用對象存儲平臺來做測試。
主要代碼如下:
生成docker鏡像
1.代碼編寫完成後,通過maven工具對本項目進行編譯和打包,得到該項目的jar包:WordCount1-1.0-SNAPSHOT.jar
2.然後編寫Dockerfile並生成docker鏡像:
Dockerfile文件內容如下:
生成docker鏡像:
將作業運行於K8S集群
打包好應用鏡像之後,編寫words.yaml文件,用於提交作業到K8S集群:
然後我們將在K8S集群中運行該示例程序:
提交作業
通過執行sparkctl create $.yaml 來提交作業:
查看Spark作業的運行狀態
作業提交後,通過執行 sparkctl list 命令可以查看所有已提交的Spark 作業,並可以查看作業當前的運行狀態:
查看應用日誌
通過執行sparkctl log $ 命令即可輸出該作業的詳細日誌信息:
後續我們考慮通過filebeat將pod的日誌信息收集到ELK日誌平臺,從而為用戶提供更為方便的日誌查詢渠道。
查看Spark作業的事件
通過執行sparkctl event $ 命令查看該Spark作業的event信息;
另外,在作業啟動失敗的情況下也可以通過該命令來分析失敗原因:
查看作業運行界面
每個Spark作業的UI服務都通過ingress的暴露方式對外發布,通過執行 kubectl get ingress 命令即可獲取相應作業的ingress地址:
得到ingress地址後,即可通過瀏覽器查看該作業的運行情況,下圖展示的就是wordcount的Spark原生運行界面:
注意:沒有DNS的同學可以在hosts中添加相應的域名映射即可訪問。
至此我們了解了Spark on K8S的相關原理,並從0開始搭建環境並實踐了一個的基於Spark Operator的應用。
目前Spark Operator這個項目還在不斷地更新迭代,我們將持續關注該項目的進展,並繼續在用戶授權、資源隔離、日誌接入和作業監控等方面進行進一步地測試和驗證,逐漸探索出適合我行的Spark on K8S方案。
作者簡介:
焦媛,2011年加入民生銀行,目前主要負責Hadoop平臺運維和相關工具研發,以及HDFS和Spark相關產品的技術支持工作。