EasyScheduler的架構原理及實現思路,大數據工作流調度系統

2021-02-21 21CTO
Easy Scheduler 大數據工作流調度系統已經開源,下載地址:https://github.com/analysys/

在對調度系統架構說明之前,我們先來認識一下調度系統常用的名詞。

名詞解釋

DAG:全稱Directed Acyclic Graph,簡稱DAG。工作流中的Task任務以有向無環圖的形式組裝起來,從入度為零的節點進行拓撲遍歷,直到無後繼節點為止。舉例如下圖

流程定義:通過拖拽任務節點並建立任務節點的關聯所形成的可視化 DAG

流程實例:流程實例是流程定義的實例化,可以通過手動啟動或定時調度生成

任務實例:任務實例是流程定義中任務節點的實例化,標識著具體的任務執行狀態

任務類型:目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同時計劃支持動態插件擴展,注意:其中子 SUB_PROCESS 也是一個單獨的流程定義,是可以單獨啟動執行的

調度方式:系統支持基於cron表達式的定時調度和手動調度。

命令類型支持:啟動工作流、從當前節點開始執行、恢復被容錯的工作流、恢復暫停流程、從失敗節點開始執行、補數、調度、重跑、暫停、停止、恢復等待線程。其中 恢復被容錯的工作流 和 恢復等待線程 兩種命令類型是由調度內部控制使用,外部無法調用

定時調度:系統採用 quartz 分布式調度器,並同時支持cron表達式可視化的生成

依賴:系統不單單支持 DAG 簡單的前驅和後繼節點之間的依賴,同時還提供 任務依賴 節點,支持 流程間的自定義任務依賴

優先級:支持流程實例和任務實例的優先級,如果流程實例和任務實例的優先級不設置,則默認是先進先出

郵件告警:支持 SQL任務 查詢結果郵件發送,流程實例運行結果郵件告警及容錯告警通知

失敗策略:對於並行運行的任務,如果有任務失敗,提供兩種失敗策略處理方式, 繼續 是指不管並行運行任務的狀態,直到流程失敗結束。結束 是指一旦發現失敗任務,則同時Kill掉正在運行的並行任務,流程失敗結束

補數:補歷史數據,支持 區間並行和串行 兩種補數方式

系統架構

系統架構圖

架構說明

MasterServer

MasterServer採用分布式無中心設計理念,MasterServer主要負責 DAG 任務切分、任務提交監控,並同時監聽其它MasterServer和WorkerServer的健康狀態。MasterServer服務啟動時向Zookeeper註冊臨時節點,通過監聽Zookeeper臨時節點變化來進行容錯處理。

該服務內主要包含:

Distributed Quartz分布式調度組件,主要負責定時任務的啟停操作,當quartz調起任務後,Master內部會有線程池具體負責處理任務的後續操作

MasterSchedulerThread是一個掃描線程,定時掃描資料庫中的 command 表,根據不同的 命令類型 進行不同的業務操作

MasterExecThread主要是負責DAG任務切分、任務提交監控、各種不同命令類型的邏輯處理

MasterTaskExecThread主要負責任務的持久化

WorkerServer

WorkerServer也採用分布式無中心設計理念,WorkerServer主要負責任務的執行和提供日誌服務。WorkerServer服務啟動時向Zookeeper註冊臨時節點,並維持心跳。

該服務包含:

FetchTaskThread主要負責不斷從 Task Queue 中領取任務,並根據不同任務類型調用TaskScheduleThread 對應執行器。

LoggerServer是一個RPC服務,提供日誌分片查看、刷新和下載等功能

ZooKeeper

ZooKeeper服務,系統中的MasterServer和WorkerServer節點都通過ZooKeeper來進行集群管理和容錯。另外系統還基於ZooKeeper進行事件監聽和分布式鎖。我們也曾經基於Redis實現過隊列,不過我們希望EasyScheduler依賴到的組件儘量地少,所以最後還是去掉了Redis實現。

Task Queue

提供任務隊列的操作,目前隊列也是基於Zookeeper來實現。由於隊列中存的信息較少,不必擔心隊列裡數據過多的情況,實際上我們壓測過百萬級數據存隊列,對系統穩定性和性能沒影響。

Alert

提供告警相關接口,接口主要包括 告警 兩種類型的告警數據的存儲、查詢和通知功能。其中通知功能又有 郵件通知 和**SNMP(暫未實現)**兩種。

API

API接口層,主要負責處理前端UI層的請求。該服務統一提供RESTful api向外部提供請求服務。接口包括工作流的創建、定義、查詢、修改、發布、下線、手工啟動、停止、暫停、恢復、從該節點開始執行等等。

UI

系統的前端頁面,提供系統的各種可視化操作界面,詳見**系統使用手冊**部分。

架構設計思想

2.3.1 去中心化vs中心化

中心化思想

中心化的設計理念比較簡單,分布式集群中的節點按照角色分工,大體上分為兩種角色:

Master的角色主要負責任務分發並監督Slave的健康狀態,可以動態的將任務均衡到Slave上,以致Slave節點不至於「忙死」或」閒死」的狀態。

Worker的角色主要負責任務的執行工作並維護和Master的心跳,以便Master可以分配任務給Slave。

中心化思想設計存在的問題:

一旦Master出現了問題,則群龍無首,整個集群就會崩潰。為了解決這個問題,大多數Master/Slave架構模式都採用了主備Master的設計方案,可以是熱備或者冷備,也可以是自動切換或手動切換,而且越來越多的新系統都開始具備自動選舉切換Master的能力,以提升系統的可用性。

另外一個問題是如果Scheduler在Master上,雖然可以支持一個DAG中不同的任務運行在不同的機器上,但是會產生Master的過負載。如果Scheduler在Slave上,則一個DAG中所有的任務都只能在某一臺機器上進行作業提交,則並行任務比較多的時候,Slave的壓力可能會比較大。

去中心化

在去中心化設計裡,通常沒有Master/Slave的概念,所有的角色都是一樣的,地位是平等的,全球網際網路就是一個典型的去中心化的分布式系統,聯網的任意節點設備down機,都只會影響很小範圍的功能。

去中心化設計的核心設計在於整個分布式系統中不存在一個區別於其他節點的」管理者」,因此不存在單點故障問題。但由於不存在」 管理者」節點所以每個節點都需要跟其他節點通信才得到必須要的機器信息,而分布式系統通信的不可靠行,則大大增加了上述功能的實現難度。

實際上,真正去中心化的分布式系統並不多見。反而動態中心化分布式系統正在不斷湧出。在這種架構下,集群中的管理者是被動態選擇出來的,而不是預置的,並且集群在發生故障的時候,集群的節點會自發的舉行"會議"來選舉新的"管理者"去主持工作。最典型的案例就是ZooKeeper及Go語言實現的Etcd。

EasyScheduler的去中心化是Master/Worker註冊到Zookeeper中,實現Master集群和Worker集群無中心,並使用Zookeeper分布式鎖來選舉其中的一臺Master或Worker為「管理者」來執行任務。

分布式鎖實踐

EasyScheduler使用ZooKeeper分布式鎖來實現同一時刻只有一臺Master執行Scheduler,或者只有一臺Worker執行任務的提交。

獲取分布式鎖的核心流程算法如下

EasyScheduler中Scheduler線程分布式鎖實現流程圖:

線程不足循環等待問題

如果一個DAG中沒有子流程,則如果Command中的數據條數大於線程池設置的閾值,則直接流程等待或失敗。

如果一個大的DAG中嵌套了很多子流程,如下圖則會產生「死等」狀態:

上圖中MainFlowThread等待SubFlowThread1結束,SubFlowThread1等待SubFlowThread2結束, SubFlowThread2等待SubFlowThread3結束,而SubFlowThread3等待線程池有新線程,則整個DAG流程不能結束,從而其中的線程也不能釋放。這樣就形成的子父流程循環等待的狀態。此時除非啟動新的Master來增加線程來打破這樣的」僵局」,否則調度集群將不能再使用。

對於啟動新Master來打破僵局,似乎有點差強人意,於是我們提出了以下三種方案來降低這種風險:

計算所有Master的線程總和,然後對每一個DAG需要計算其需要的線程數,也就是在DAG流程執行之前做預計算。因為是多Master線程池,所以總線程數不太可能實時獲取。

對單Master線程池進行判斷,如果線程池已經滿了,則讓線程直接失敗。

增加一種資源不足的Command類型,如果線程池不足,則將主流程掛起。這樣線程池就有了新的線程,可以讓資源不足掛起的流程重新喚醒執行。

注意:Master Scheduler線程在獲取Command的時候是FIFO的方式執行的。

於是我們選擇了第三種方式來解決線程不足的問題。

容錯設計

容錯分為服務宕機容錯和任務重試,服務宕機容錯又分為Master容錯和Worker容錯兩種情況:

宕機容錯

服務容錯設計依賴於ZooKeeper的Watcher機制,實現原理如圖:

其中Master監控其他Master和Worker的目錄,如果監聽到remove事件,則會根據具體的業務邏輯進行流程實例容錯或者任務實例容錯。

Master容錯流程圖:

ZooKeeper Master容錯完成之後則重新由EasyScheduler中Scheduler線程調度,遍歷 DAG 找到」正在運行」和「提交成功」的任務,對」正在運行」的任務監控其任務實例的狀態,對」提交成功」的任務需要判斷Task Queue中是否已經存在,如果存在則同樣監控任務實例的狀態,如果不存在則重新提交任務實例。

Worker容錯流程圖:

Master Scheduler線程一旦發現任務實例為」 需要容錯」狀態,則接管任務並進行重新提交。

注意:由於」 網絡抖動」可能會使得節點短時間內失去和ZooKeeper的心跳,從而發生節點的remove事件。對於這種情況,我們使用最簡單的方式,那就是節點一旦和ZooKeeper發生超時連接,則直接將Master或Worker服務停掉。

任務失敗重試

這裡首先要區分任務失敗重試、流程失敗恢復、流程失敗重跑的概念:

任務失敗重試是任務級別的,是調度系統自動進行的,比如一個Shell任務設置重試次數為3次,那麼在Shell任務運行失敗後會自己再最多嘗試運行3次

流程失敗恢復是流程級別的,是手動進行的,恢復是從只能 從失敗的節點開始執行 或 從當前節點開始執行

流程失敗重跑也是流程級別的,是手動進行的,重跑是從開始節點進行

接下來說正題,我們將工作流中的任務節點分了兩種類型。

一種是業務節點,這種節點都對應一個實際的腳本或者處理語句,比如Shell節點,MR節點、Spark節點、依賴節點等。

還有一種是邏輯節點,這種節點不做實際的腳本或語句處理,只是整個流程流轉的邏輯處理,比如子流程節等。

每一個 業務節點 都可以配置失敗重試的次數,當該任務節點失敗,會自動重試,直到成功或者超過配置的重試次數。邏輯節點 不支持失敗重試。但是邏輯節點裡的任務支持重試。

如果工作流中有任務失敗達到最大重試次數,工作流就會失敗停止,失敗的工作流可以手動進行重跑操作或者流程恢復操作

任務優先級設計

在早期調度設計中,如果沒有優先級設計,採用公平調度設計的話,會遇到先行提交的任務可能會和後繼提交的任務同時完成的情況,而不能做到設置流程或者任務的優先級,因此我們對此進行了重新設計,目前我們設計如下:

按照 不同流程實例優先級 優先於 同一個流程實例優先級 優先於 同一流程內任務優先級 優先於同一流程內任務 提交順序依次從高到低進行任務處理。

其中流程定義的優先級是考慮到有些流程需要先於其他流程進行處理,這個可以在流程啟動或者定時啟動時配置,共有5級,依次為HIGHEST、HIGH、MEDIUM、LOW、LOWEST。

如下圖

具體實現是根據任務實例的json解析優先級,然後把 流程實例優先級_流程實例id_任務優先級_任務id 信息保存在ZooKeeper任務隊列中,當從任務隊列獲取的時候,通過字符串比較即可得出最需要優先執行的任務

任務的優先級也分為5級,依次為HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下圖

Logback和gRPC實現日誌訪問

由於Web(UI)和Worker不一定在同一臺機器上,所以查看日誌不能像查詢本地文件那樣。有兩種方案:

將日誌放到ES搜尋引擎上

通過gRPC通信獲取遠程日誌信息

介於考慮到儘可能的EasyScheduler的輕量級性,所以選擇了gRPC實現遠程訪問日誌信息。

我們使用自定義Logback的FileAppender和Filter功能,實現每個任務實例生成一個日誌文件。

FileAppender主要實現如下:

/** * task log appender */publicclass TaskLogAppender extends FileAppender<ILoggingEvent {

...

@Override protectedvoid append(ILoggingEvent event) {

if (currentlyActiveFile == null){ currentlyActiveFile = getFile(); } String activeFile = currentlyActiveFile;// thread name:taskThreadName-processDefineId_processInstanceId_taskInstanceId String threadName = event.getThreadName(); String[] threadNameArr = threadName.split("-");// logId = processDefineId_processInstanceId_taskInstanceId String logId = threadNameArr[1]; ... super.subAppend(event); }}

以/流程定義id/流程實例id/任務實例id.log的形式生成日誌

過濾匹配以TaskLogInfo開始的線程名稱:

TaskLogFilter實現如下:

/*** task log filter*/publicclassTaskLogFilterextendsFilter<ILoggingEvent{

@Override public FilterReply decide(ILoggingEvent event){ if (event.getThreadName().startsWith("TaskLogInfo-")){ return FilterReply.ACCEPT; } return FilterReply.DENY; }}

總結

本文從調度出發,初步介紹了大數據分布式工作流調度系統--EasyScheduler的架構原理及實現思路。

相關焦點

  • 圖解 kubernetes scheduler 源碼設計系列-初步了解
    在因為之前親和性的問題,如何在多個zone中的所有node中選擇出一個合適的節點,則是一個比較大的挑戰系統資源除了cpu、內存還包括網絡、磁碟io、gpu等等,針對其餘資源的分配調度,kubernetes還需要提供額外的擴展機制來進行調度擴展的支持kubernetes初期是針對pod調度場景而生,主要其實是在線web業務,這類任務的特點大部分都是無狀態的,那如何針對離線場景的去支持離線的批處理計算等任務
  • DolphinScheduler介紹
    工作流中的Task任務以有向無環圖的形式組裝起來,從入度為零的節點進行拓撲遍歷,直到無後繼節點為止。Apache DolphinScheduler(目前處在孵化階段)是一個分布式、去中心化、易擴展的可視化DAG工作流任務調度系統,其致力於解決數據處理流程中錯綜複雜的依賴關係,使調度系統在數據處理流程中開箱即用。
  • 系統架構
    Loader實現FusionInsight HD與關係型資料庫、文件系統之間交換數據和文件的數據加載工具;同時提供REST API接口,供第三方調度平臺調用。Flume一個分布式、可靠和高可用的海量日誌聚合系統,支持在系統中定製各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫入各種數據接受方(可定製)的能力。
  • Apache DolphinScheduler 是如何誕生的
    工作流中的Task任務以有向無環圖的形式組裝起來,從入度為零的節點進行拓撲遍歷,直到無後繼節點為止。Apache DolphinScheduler (目前處在孵化階段) 是一個分布式、去中心化、易擴展的可視化 DAG 工作流任務調度系統,其致力於解決數據處理流程中錯綜複雜的依賴關係,使調度系統在數據處理流程中開箱即用。
  • Android系統架構開篇
    一、引言本文作為Android系統架構的開篇,起到提綱挈領的作用,從系統整體架構角度概要講解Android系統的核心技術點,帶領大家初探Android系統全貌以及內部運作機制。雖然Android系統非常龐大且錯綜複雜,需要具備全面的技術棧,但整體架構設計清晰。
  • Quartz 集群實戰及原理解析
    , 不然JDK自帶Timer就可以實現相同的功能, 而Timer存在的單點故障是生產環境上所不能容忍的。-- 調度工廠 --> <bean id="scheduler" lazy-init="false" autowire="no"    >       <!
  • 掌握 Android 系統架構,看這一篇就夠了!| 技術頭條
    Binder IPC原理Binder通信採用c/s架構,從組件視角來說,包含Client、Server、ServiceManager以及binder驅動,其中ServiceManager用於管理系統中的各種服務。
  • 系統架構師—著眼於系統的「技術實現」
    關注並標星大同學吧每天1次,打卡閱讀了解崗位職責和必備技能今天是大同學吧崗位專欄第56期職位介紹之系統架構師系統架構師是一個最終確認和評估系統需求,給出開發規範,搭建系統實現的核心構架,並澄清技術細節、掃清主要難點的技術人員。
  • 物聯網系統架構介紹
    只有遵循該套協議的設備相互間能夠通信,能夠交換數據。常用的物聯網通信協議有哪些? 主要有如下協議:MQTT,COAP等,他們有個共同點都是基於消息模型來實現的。設備與設備之間,設備與雲端之間通過交換消息來實現通信,消息裡面攜帶了通信數據。
  • 推薦系統原理、工程、大廠(Youtube、BAT、TMB)架構幹活分享
    本文匯集了關於推薦系統原理、工程、各大長推薦架構、經驗相關的純乾貨。原理篇整理了內容推薦、協同推薦、舉證分解、模型融合、Bandit和深度學習相關的經典方法。工程篇整理了推薦系統常見架構、關鍵模塊和效果驗證相關的資源。實戰部分整理了Netfix、Hulu、Youtube、Google、Amazon、BAT、TMD等各大網際網路公司推薦系統實戰相關的經驗。
  • 各種系統架構圖與詳細說明
    1.1 應用層級說明整體應用系統架構設計分為五個基礎層級,通過有效的層級結構的劃分可以全面展現整體應用系統的設計思路。應用支撐層應用支撐層是整體應用系統建設的基礎保障,根據本次招標文件相關需求,我們進行了相關面向服務體系架構的設計,通過統一的企業級總線服務實現相關應用組件包括工作流、表單、統一管理、資源共享等應用組件進行有效的整合和管理,各個應用系統的建設可以有下基於基礎支撐組件的應用,快速搭建相關功能模塊。
  • Linux C/C++定時器的實現原理和使用方法
    對於內核,簡單來說就是用特定的數據結構管理眾多的定時器,在時鐘中斷處理中判斷哪些定時器超時,然後執行超時處理動作。而用戶空間程序不直接感知CPU時鐘中斷,通過感知內核的信號、IO事件、調度,間接依賴時鐘中斷。用軟體來實現動態定時器常用數據結構有:時間輪、最小堆和紅黑樹。
  • 案例 |智慧鐵水運輸:以無人機車為核心,結合機器視覺、大數據實現鐵水智能調度
    賽迪奇智打造的無人機車整體解決方案成功實現了人工智慧技術在工業運輸場景的落地。以機車無人駕駛和智慧運輸調度為核心,結合大數據分析及機器視覺、機器學習等先進技術,實現了工業場景全天候全流程高效無人化運輸及調度作業。
  • 【乾貨】最全大數據學習資源整理
    框架Apache Hadoop:分布式處理架構,結合了 MapReduce(並行處理)、YARN(作業調度)和HDFS(分布式文件系統);Tigon:高吞吐量實時流處理框架。Apache MapReduce :在集群上使用並行、分布式算法處理大數據集的編程模型;Apache Pig :Hadoop中,用於處理數據分析程序的高級查詢語言;Apache REEF :用來簡化和統一低層大數據系統的保留性評估執行框架;Apache S4 :S4中流處理與實現的框架;Apache Spark :內存集群計算框架;
  • 「數據架構」什麼是實體關係圖(ERD)?
    選擇ERD工具使用ERD開發數據模型需要時間和精力。>【超級架構師】精彩圖文詳解架構方法論,架構實踐,技術原理,技術趨勢。微信小號【cea_csa_cto】50000人社區,討論:企業架構,雲計算,大數據,數據科學,物聯網,人工智慧,安全,全棧開發,DevOps,數位化.
  • 【架構】淺談 Web 網站架構演變過程
    往期關於架構文章可以關注微信公眾號:Java後端,後臺回復 技術博文 獲取。我們以增加了一臺應用伺服器為例,增加後的系統結構圖如下:缺點:代理伺服器可能成為性能的瓶頸,特別是一次上傳大文件。4、IP層負載均衡。在請求到達負載均衡器後,負載均衡器通過修改請求的目的IP位址,從而實現請求的轉發,做到負載均衡。5、數據鏈路層負載均衡。
  • sched 模塊中巨好用的輕量級定時任務神器scheduler!
    Python 提供有一個強大的、可用來定義執行任務調度的 sched 模塊,該模塊中含有一個 scheduler 類,可用來執行更複雜的任務調度
  • 深度解密系統架構背後的技術,雲+社區沙龍online「架構演進」乾貨整理
    既要應對龐大的用戶量、日均數十億 PV 的高並發、PB 級別的數據存儲等問題的挑戰,同時又要求保證系統的高可用和彈性伸縮,並且能夠根據需要進行快速迭代擴展,令人頭疼的系統架構到底應該怎麼做? 多年發展下,網際網路架構經歷了從簡到繁的演進過程,從單體架構到集群架構、分布式架構再到微服務架構,每一種架構都是為了解決問題而生。
  • Linux塊設備IO子系統
    #IO調度就是電梯算法。我們知道,磁碟是的讀寫是通過機械性的移動磁頭來實現讀寫的,理論上磁碟設備滿足塊設備的隨機讀寫的要求,但是出於節約磁碟,提高效率的考慮,我們希望當磁頭處於某一個位置的時候,一起將最近需要寫在附近的數據寫入,而不是這寫一下。