作者 | 韓飛
1
引言
隨著大數據技術的不斷發展,數據實時性的需求變得越來越迫切,這對實時數據處理的基礎架構提出了更高的要求。如何應對實時數據的流量變化,特別是突發流量,成為實時數據處理架構不得不面對的挑戰性問題。對於 FreeWheel 這樣一家服務全美 90% 的主流電視媒體和運營商的視頻廣告投放和管理平臺,對於突發流量的應對能力尤為重要。數據基礎架構團隊作為 FreeWheel 數據處理的載體,提供了公司級的實時和離線數據處理及查詢服務。在本文中,我們將和大家分享我們在實時數據處理彈性伸縮方面的實踐。
2
FreeWheel 實時數據平臺架構
FreeWheel 數據基礎架構基於大數據開源軟體和部分自研軟體構建,所有服務均部署在 AWS EC2 上。目前,整個數據平臺為 Lambda 架構,並逐漸向流批一體的融合平臺發展。其中,實時數據平臺整體上主要分為四個部分:基於 Kafka 構建的消息隊列,自研的消息處理中間件 Matcher,基於 SQL 開發方式的實時計算平臺 SQL on Streaming(SOS)以及以 Druid 為主體構建的實時數據查詢服務。
如上圖所示,FreeWheel 的廣告伺服器 ADS 部署在美國不同地區的 11 個 Data Center,例如 NYC,ASH 等,並將日誌數據以 PB 格式寫入本地的 Kafka 服務(LocalKafka),然後通過 MirrorMaker 等服務將數據同步到部署在 AWS 的 Global Kafka。Matcher 是 FreeWheel 自研的消息處理中間件,主要用於匹配廣告投放過程中的 Request 和 Ack,並進行必要的數據去重。SOS 基於 Spark Streaming 和 SparkSQL,對實時應用開發中依賴的數據讀寫進行抽象及封裝,並支持消費端位點管理,Join 維度數據,寫數據冪等,數據模型依賴,彈性伸縮等特性。用戶只需關心讀寫端元信息及用 SQL 表達計算邏輯而無需關心其他細節,同時為了 方便用戶使用,結合 CICD 等為用戶構建功能,性能以及回歸測試服務,使用戶的使用儘量趨於自助化(Self-Service)。實時數據處理的下遊是基於 Druid 構建的數據存儲及查詢服務,並自研針對 Druid Segment 小文件的合併工具,基於實際流量進行數據攝入自動伸縮容,用戶不友好查詢的追蹤分析等周邊服務,保障高服務質量。同時,對於數據 Join 需求較高的場景,通過實現 Presto Druid Connector 來覆蓋這一部分的需求。
3
FreeWheel 實時計算彈性伸縮
現狀及需求
由於 FreeWheel 流量高低時段相對比較規律,因此最早我們結合 Jenkins Job,按時段規律定期的對 SOS 的應用進行伸縮。這種方式會以 Graceful 的方式重啟應用,在相當一段時間裡可以滿足應用計算資源的按需增減以及節約整體計算成本(Cost Saving)的要求。然而隨著平臺的發展,有些問題開始逐漸凸顯,主要有兩個:一是對於突發的流量增長,仍需工程師介入進行手動處理;二是這種定時啟停的方式會增加整個實時應用不可用的時間,對於實時數據這種 SLA 要求較高的系統並不友好。因此,使實時計算具備基於實際數據流量進行動態伸縮的能力成為我們必須要解決的問題,這也與 FreeWheel 數據系統 Self-Service 的理念保持一致。
Spark Streaming Dynamic ResourceAllocation
由於歷史及技術棧的原因,SOS 基於 Spark 生態的技術構建並運行在 Yarn 上。對於 Spark 的動態資源調度,大家可能知道 Dynamic Resource Allocation,它的原理比較直觀,在 ExecutorAllocationManager 組件中啟動一個周期性任務監聽 Executor 狀態,如果 Executor 在一段時間內一直處於空閒狀態,則殺掉該 Executor,如果 Executor 一直處於高負載的狀態,則增加 Executor 數量。與此同時,為保障處理性能的穩定並且對 Executor 的增減進行約束,可以定義 Executor 數量的最大及最小值。Dynamic Resource Allocation 這個特性常常被應用在 Spark 離線計算中。而怎樣殺死或者新增 Executor 呢?如下圖所示,藉助於內嵌在 ExecutorAllocationManager 中的 ExecutorAllocationClient,可以將計算出的 Executor 的新數量發送給 ClusterManager,由後者完成計算資源的重新調度。
實際上,SparkStreaming 也有 Dynamic ResourceAllocation 的特性,不過在官方文檔中鮮有提及。它的原理類似,在 Streaming 的 ExecutorAllocationManager 中同樣啟動一個周期運行的監聽任務,它定時的判斷當前計算資源是否充足,以此來計算是否要新增或者殺死 Executor,而判斷的依據源於計算一個 Ratio 值,Ratio 值的計算依賴於 Spark 用於流式計算過程中收集 Metrics 的組件 StreamingListener。藉助於 StreamingListener,可以在 Spark Streaming 每個 Batch 結束後獲取關於該 Batch 處理的一些 Metrics,例如調度延遲(SchedulingDelay),處理延遲(ProcessingDelay),處理條數(NumRecords)等。
假設當前 Spark Streaming 應用的 BatchDuration 是 30 秒,配置的周期性調度的時間是 2 分鐘,那麼每次調度時,我們可以拿到過去 4 個 Batch 的處理情況並計算出處理延遲的平均值,前邊提到的 Ratio 值的算法就是用平均處理延遲除以 Batch 時間得到,如下圖所示。假設過去 4 個 Batch 的平均處理延遲是 20 秒,我們可以估算出目前應用的處理能力大約還有三分之一的彈性空間。我們可以設置彈性伸縮相關的閾值 Scale Up 和 ScaleDown,那麼當 Ratio 的值大於 Scale Up 閾值,應用會向 Spark 的 Cluster Manager 發送請求申請新增 Executor,當小於 Scale Down 閾值時,應用會從當前 Executor 中選擇不包含 Receiver 的 Executor 並從中隨機選取一個,然後發送請求殺死該 Executor。同樣的,可以定義 Executor 數量的最大最小值。以上就是原生 Spark Streaming Dynamic Resource Allocation 的基本原理。需要注意的是,新增 Executor 時,需要保證運行應用的資源池資源足夠,例如假設應用運行在 Yarn 上,此時需要保證應用所在的隊列資源足夠。
SOS Dynamic Resource Allocation
SOS DynamicResource Allocation(簡稱 SOS DRA)在繼承原生特性的基礎上,有幾點定製化的實現。首先思考一個場景,假設 SOS 應用拉取 Kafka 數據的能力已接近瓶頸,基於當前的計算資源算出的平均處理延遲會趨於一個恆定值,假如此時數據生產的速度大於處理速度,那麼整體來講,SOS 消費數據的 Lag 仍在不斷增長。因此,對於前文提到的 Ratio 值的計算,我們引入當前應用的消費 Lag,結合平均處理延遲來加權計算 Ratio。Ratio 的計算邏輯如下邊公式所示:
其中, timeBasedRatio 為原始計算邏輯,lagBasedRatio 由當前消費 Lag 的值除以過去一輪 SOS DRA 調度中每個 Batch 處理的數據條數的平均值得到。timeWeight,lagWeight 分別為兩種 Ratio 對應的權重。因為 SOS DRA 依賴外部服務,那麼自然存在外部依賴不可用的情況。為保障服務的健壯性,我們對可能存在的異常場景進行針對處理。例如,當依賴的 Rest 服務出現問題而無法拿到消費 Lag 信息,SOS DRA 的調度算法將退化到只基於平均處理延遲計算 Ratio 值,即當 Rest 服務出現問題時,lagWeight 調整為 0,timeWeight 調整為 1。此時,整個表達式退化為原始計算邏輯。
其次,由於數據基礎架構團隊的軟體服務都部署在 AWS EC2 上,因此 ScaleUp 和 Scale Down 會涉及 AWS EC2 資源的申請與釋放。數據基礎架構團隊結合 AWS Auto Scaling Groups 自研 AWS EC2 資源服務 Asger。當涉及新增及刪除 Executor 時,SOS DRA 會調用 Asger 服務,由後者完成 AWS EC2 資源的申請釋放以及 SOS 應用所在 Yarn 隊列的資源更新。第三,由於不想在 SOS DRA 中耦合更多邏輯,額外開發一個 Rest 服務,用於獲取當前的 Lag 信息以及配置一些其他的調度規則等。第四,將每個調度周期計算的 Ratio 值及更新後 Executor 數量暴露到 JMX,這樣這些信息最終會被採集並展現在基於 Datadog 構建的監控系統中用於實時追蹤當前的調度情況。SOS DRA 的原理如下圖。
實際部署效果如下圖示例,該示例中 SOS 應用啟動時指定 20 臺 Executor,並分別指定 ScaleUp 和 Scale Down 的閾值為 1.0 和 0.7。由下圖一圖二可見,分別在 12:45 左右,13:00 左右,當 Ratio 閾值小於 0.7 時,Executor 數量由 20 臺逐漸縮減至 17 臺。在接近 13:20,當遭遇突發流量波動,Ratio 值超過 1 時,Executor 數量由 17 臺彈回至 19 臺。當時間超過 13:35,Ratio 值再次小於 0.7 時,機器再次由 19 臺縮減至 18 臺。
4
Druid 實時數據攝入彈性伸縮
ApacheDruid 具備很多的優點,例如數據預聚合,低耦合架構,多種數據攝入方式,與大數據生態良好的親和性等。因此結合廣告業務特點,FreeWheel 實時數據系統下遊選擇 Druid 作為數據存儲及查詢的核心引擎。同時出於高可用性,維護友好等角度考慮,我們將 Druid 部署在 AWS EKS 上。
Druid 實時數據攝入原理
Druid 的數據攝入分為實時(Realtime)以及離線(Batch)兩種,整體呈現 Lambda 架構,Druid 架構中 MiddleManager 角色負責啟動 Peon 進程,由 Peon 來完成數據的真正攝入。實時數據的攝入主要針對上遊是 Kafka,Kinesis 等消息隊列的場景,Peon 集成的 Kafka Consumer Client 會 7*24 實時讀取上遊消息隊列中的數據並保障 Exactly Once 的語義。離線數據導入主要針對將 HDFS,S3 上的數據批量導入 Druid 的場景。
當需要將數據由 Kafka 導入 Druid 時,用戶需要指定一個 Supervisor 任務,通過定義 Payload 文件以 Json 格式來描述需要導入的信息,並提交給 Overload 節點,由 Overload 節點通知 MiddleManager 啟動 Peon 進程完成數據的攝入。任務提交後,交由 Overload 進程中的 SupervisorManager 對象處理,它對維護的 Supervisor 內部狀態做必要的更新然後生成 KafkaSupervisorSpec 並交由 KafkaSupervisor 處理。KafkaSupervisor 負責管理 IngestTask 的生命周期,它接收 KafkaSupervisorSpec 作為輸入,生成對應的 Tasks 並管理這些 Tasks 與 KafkaTopic 分區的映射關係。KafkaSupervisor 內部還包含一個處理線程,它不斷的消費 Notice 隊列的內容並調用對應的 handle 方法。Notice 是 Druid 中對 SupervisorOperation 的抽象表示,例如我們提交,暫停或者重置 Supervisor 時會分別對應 RunNotice,ShutdownNotice 和 ResetNotice。綜上可見,我們要實現實時數據攝入的動態伸縮,本質上是要幹預 Task 任務的生成。實時數據攝入的原理如下圖,其中紅色方框的部分為我們新增的部分,後文中將繼續介紹。
FreeWheel 對 Druid 實時數據攝入彈性伸縮的改進
由於 Druid 攝入 Kafka 數據是否及時可以根據消費 Lag 來直接判斷,因此我們按固定間隔採集 Lag 信息,基於此新增一種 Notice 的實現 DynamicAllocationTaskNotice 並將它插入到 Notice 隊列裡。具體地,在 KafkaSupervisor 中增加記錄 Lag 的定長隊列 CircularFifoQueue,按固定間隔插入採集到的 Lag 信息。當彈性伸縮的調度服務被定時觸發時,它會創建 DynamicAllocationTaskNotice。當 DynamicAllocationTaskNotice 的 handle 方法被調用時,它會判斷定長隊列裡 Lag 的情況,當超過 Scale Up 閾值的點的比例超過設置的閾值,則按指定的步長進行 Scale Up,反之,當小於 Scale Down 閾值的點的比例小於設置的閾值,則按指定的步長進行 Scale Down。同時,通過設定 Task 數量的最大值和最小值,來保障單個數據源佔用的資源在一定範圍內。原理圖如下:
在 Payload 文件中新增 dynamicAllocationTasksProperties 的配置模塊,該特性的具體新增配置屬性如下:
實際運行效果如下圖所示,在美國大選期間的某一時間段,突發一流量高峰,從圖一中可以看到某 Topic 在大約 11:20-13:20 之間流量從每秒不到 100K 條數據增長到最高值超過每秒 286K 條。圖二為該 Topic 對應的 Druid 數據攝入的 Task 數量,如圖中所示,大約在相同時間段,Task 數量從之前的 6 個自動擴展到 10 個,達到設定的最大值,在流量高峰過去後,自動縮回到之前 6 個的水平,整個過程無需工程師幹預。
我們目前實現的實時數據攝入動態伸縮是基於當前 MiddleManager 所能提供的 Task 總容量進行的,可以滿足線上需求。
5
未來計劃
隨著業務規模的不斷擴大,對於狀態需求較高的實時應用,例如支持 Late Data,SessionWindow 等,我們正在考慮為 SOS 支持 Flink 引擎。
整體架構層面,我們正在進行這幾個方面的實踐:構建流批融合的一站式平臺,通過梳理指標規範模型進一步降低用戶的開發成本,最大程度的支持整體平臺的用戶自助服務(Self-Service)。
作者介紹:
韓飛,Lead Software Engineer,FreeWheel。碩士畢業於清華大學軟體學院,目前就職於 FreeWheel 數據基礎架構團隊,任實時數據系統研發負責人。擁有多年數據平臺建設經驗,主要研究領域包括實時計算、實時數倉,OLAP、數據交換等。
今日薦文