FreeWheel實時數據系統彈性伸縮實踐

2020-12-10 騰訊網

作者 | 韓飛

1

引言

隨著大數據技術的不斷發展,數據實時性的需求變得越來越迫切,這對實時數據處理的基礎架構提出了更高的要求。如何應對實時數據的流量變化,特別是突發流量,成為實時數據處理架構不得不面對的挑戰性問題。對於 FreeWheel 這樣一家服務全美 90% 的主流電視媒體和運營商的視頻廣告投放和管理平臺,對於突發流量的應對能力尤為重要。數據基礎架構團隊作為 FreeWheel 數據處理的載體,提供了公司級的實時和離線數據處理及查詢服務。在本文中,我們將和大家分享我們在實時數據處理彈性伸縮方面的實踐。

2

FreeWheel 實時數據平臺架構

FreeWheel 數據基礎架構基於大數據開源軟體和部分自研軟體構建,所有服務均部署在 AWS EC2 上。目前,整個數據平臺為 Lambda 架構,並逐漸向流批一體的融合平臺發展。其中,實時數據平臺整體上主要分為四個部分:基於 Kafka 構建的消息隊列,自研的消息處理中間件 Matcher,基於 SQL 開發方式的實時計算平臺 SQL on Streaming(SOS)以及以 Druid 為主體構建的實時數據查詢服務。

如上圖所示,FreeWheel 的廣告伺服器 ADS 部署在美國不同地區的 11 個 Data Center,例如 NYC,ASH 等,並將日誌數據以 PB 格式寫入本地的 Kafka 服務(LocalKafka),然後通過 MirrorMaker 等服務將數據同步到部署在 AWS 的 Global Kafka。Matcher 是 FreeWheel 自研的消息處理中間件,主要用於匹配廣告投放過程中的 Request 和 Ack,並進行必要的數據去重。SOS 基於 Spark Streaming 和 SparkSQL,對實時應用開發中依賴的數據讀寫進行抽象及封裝,並支持消費端位點管理,Join 維度數據,寫數據冪等,數據模型依賴,彈性伸縮等特性。用戶只需關心讀寫端元信息及用 SQL 表達計算邏輯而無需關心其他細節,同時為了 方便用戶使用,結合 CICD 等為用戶構建功能,性能以及回歸測試服務,使用戶的使用儘量趨於自助化(Self-Service)。實時數據處理的下遊是基於 Druid 構建的數據存儲及查詢服務,並自研針對 Druid Segment 小文件的合併工具,基於實際流量進行數據攝入自動伸縮容,用戶不友好查詢的追蹤分析等周邊服務,保障高服務質量。同時,對於數據 Join 需求較高的場景,通過實現 Presto Druid Connector 來覆蓋這一部分的需求。

3

FreeWheel 實時計算彈性伸縮

現狀及需求

由於 FreeWheel 流量高低時段相對比較規律,因此最早我們結合 Jenkins Job,按時段規律定期的對 SOS 的應用進行伸縮。這種方式會以 Graceful 的方式重啟應用,在相當一段時間裡可以滿足應用計算資源的按需增減以及節約整體計算成本(Cost Saving)的要求。然而隨著平臺的發展,有些問題開始逐漸凸顯,主要有兩個:一是對於突發的流量增長,仍需工程師介入進行手動處理;二是這種定時啟停的方式會增加整個實時應用不可用的時間,對於實時數據這種 SLA 要求較高的系統並不友好。因此,使實時計算具備基於實際數據流量進行動態伸縮的能力成為我們必須要解決的問題,這也與 FreeWheel 數據系統 Self-Service 的理念保持一致。

Spark Streaming Dynamic ResourceAllocation

由於歷史及技術棧的原因,SOS 基於 Spark 生態的技術構建並運行在 Yarn 上。對於 Spark 的動態資源調度,大家可能知道 Dynamic Resource Allocation,它的原理比較直觀,在 ExecutorAllocationManager 組件中啟動一個周期性任務監聽 Executor 狀態,如果 Executor 在一段時間內一直處於空閒狀態,則殺掉該 Executor,如果 Executor 一直處於高負載的狀態,則增加 Executor 數量。與此同時,為保障處理性能的穩定並且對 Executor 的增減進行約束,可以定義 Executor 數量的最大及最小值。Dynamic Resource Allocation 這個特性常常被應用在 Spark 離線計算中。而怎樣殺死或者新增 Executor 呢?如下圖所示,藉助於內嵌在 ExecutorAllocationManager 中的 ExecutorAllocationClient,可以將計算出的 Executor 的新數量發送給 ClusterManager,由後者完成計算資源的重新調度。

實際上,SparkStreaming 也有 Dynamic ResourceAllocation 的特性,不過在官方文檔中鮮有提及。它的原理類似,在 Streaming 的 ExecutorAllocationManager 中同樣啟動一個周期運行的監聽任務,它定時的判斷當前計算資源是否充足,以此來計算是否要新增或者殺死 Executor,而判斷的依據源於計算一個 Ratio 值,Ratio 值的計算依賴於 Spark 用於流式計算過程中收集 Metrics 的組件 StreamingListener。藉助於 StreamingListener,可以在 Spark Streaming 每個 Batch 結束後獲取關於該 Batch 處理的一些 Metrics,例如調度延遲(SchedulingDelay),處理延遲(ProcessingDelay),處理條數(NumRecords)等。

假設當前 Spark Streaming 應用的 BatchDuration 是 30 秒,配置的周期性調度的時間是 2 分鐘,那麼每次調度時,我們可以拿到過去 4 個 Batch 的處理情況並計算出處理延遲的平均值,前邊提到的 Ratio 值的算法就是用平均處理延遲除以 Batch 時間得到,如下圖所示。假設過去 4 個 Batch 的平均處理延遲是 20 秒,我們可以估算出目前應用的處理能力大約還有三分之一的彈性空間。我們可以設置彈性伸縮相關的閾值 Scale Up 和 ScaleDown,那麼當 Ratio 的值大於 Scale Up 閾值,應用會向 Spark 的 Cluster Manager 發送請求申請新增 Executor,當小於 Scale Down 閾值時,應用會從當前 Executor 中選擇不包含 Receiver 的 Executor 並從中隨機選取一個,然後發送請求殺死該 Executor。同樣的,可以定義 Executor 數量的最大最小值。以上就是原生 Spark Streaming Dynamic Resource Allocation 的基本原理。需要注意的是,新增 Executor 時,需要保證運行應用的資源池資源足夠,例如假設應用運行在 Yarn 上,此時需要保證應用所在的隊列資源足夠。

SOS Dynamic Resource Allocation

SOS DynamicResource Allocation(簡稱 SOS DRA)在繼承原生特性的基礎上,有幾點定製化的實現。首先思考一個場景,假設 SOS 應用拉取 Kafka 數據的能力已接近瓶頸,基於當前的計算資源算出的平均處理延遲會趨於一個恆定值,假如此時數據生產的速度大於處理速度,那麼整體來講,SOS 消費數據的 Lag 仍在不斷增長。因此,對於前文提到的 Ratio 值的計算,我們引入當前應用的消費 Lag,結合平均處理延遲來加權計算 Ratio。Ratio 的計算邏輯如下邊公式所示:

其中, timeBasedRatio 為原始計算邏輯,lagBasedRatio 由當前消費 Lag 的值除以過去一輪 SOS DRA 調度中每個 Batch 處理的數據條數的平均值得到。timeWeight,lagWeight 分別為兩種 Ratio 對應的權重。因為 SOS DRA 依賴外部服務,那麼自然存在外部依賴不可用的情況。為保障服務的健壯性,我們對可能存在的異常場景進行針對處理。例如,當依賴的 Rest 服務出現問題而無法拿到消費 Lag 信息,SOS DRA 的調度算法將退化到只基於平均處理延遲計算 Ratio 值,即當 Rest 服務出現問題時,lagWeight 調整為 0,timeWeight 調整為 1。此時,整個表達式退化為原始計算邏輯。

其次,由於數據基礎架構團隊的軟體服務都部署在 AWS EC2 上,因此 ScaleUp 和 Scale Down 會涉及 AWS EC2 資源的申請與釋放。數據基礎架構團隊結合 AWS Auto Scaling Groups 自研 AWS EC2 資源服務 Asger。當涉及新增及刪除 Executor 時,SOS DRA 會調用 Asger 服務,由後者完成 AWS EC2 資源的申請釋放以及 SOS 應用所在 Yarn 隊列的資源更新。第三,由於不想在 SOS DRA 中耦合更多邏輯,額外開發一個 Rest 服務,用於獲取當前的 Lag 信息以及配置一些其他的調度規則等。第四,將每個調度周期計算的 Ratio 值及更新後 Executor 數量暴露到 JMX,這樣這些信息最終會被採集並展現在基於 Datadog 構建的監控系統中用於實時追蹤當前的調度情況。SOS DRA 的原理如下圖。

實際部署效果如下圖示例,該示例中 SOS 應用啟動時指定 20 臺 Executor,並分別指定 ScaleUp 和 Scale Down 的閾值為 1.0 和 0.7。由下圖一圖二可見,分別在 12:45 左右,13:00 左右,當 Ratio 閾值小於 0.7 時,Executor 數量由 20 臺逐漸縮減至 17 臺。在接近 13:20,當遭遇突發流量波動,Ratio 值超過 1 時,Executor 數量由 17 臺彈回至 19 臺。當時間超過 13:35,Ratio 值再次小於 0.7 時,機器再次由 19 臺縮減至 18 臺。

4

Druid 實時數據攝入彈性伸縮

ApacheDruid 具備很多的優點,例如數據預聚合,低耦合架構,多種數據攝入方式,與大數據生態良好的親和性等。因此結合廣告業務特點,FreeWheel 實時數據系統下遊選擇 Druid 作為數據存儲及查詢的核心引擎。同時出於高可用性,維護友好等角度考慮,我們將 Druid 部署在 AWS EKS 上。

Druid 實時數據攝入原理

Druid 的數據攝入分為實時(Realtime)以及離線(Batch)兩種,整體呈現 Lambda 架構,Druid 架構中 MiddleManager 角色負責啟動 Peon 進程,由 Peon 來完成數據的真正攝入。實時數據的攝入主要針對上遊是 Kafka,Kinesis 等消息隊列的場景,Peon 集成的 Kafka Consumer Client 會 7*24 實時讀取上遊消息隊列中的數據並保障 Exactly Once 的語義。離線數據導入主要針對將 HDFS,S3 上的數據批量導入 Druid 的場景。

當需要將數據由 Kafka 導入 Druid 時,用戶需要指定一個 Supervisor 任務,通過定義 Payload 文件以 Json 格式來描述需要導入的信息,並提交給 Overload 節點,由 Overload 節點通知 MiddleManager 啟動 Peon 進程完成數據的攝入。任務提交後,交由 Overload 進程中的 SupervisorManager 對象處理,它對維護的 Supervisor 內部狀態做必要的更新然後生成 KafkaSupervisorSpec 並交由 KafkaSupervisor 處理。KafkaSupervisor 負責管理 IngestTask 的生命周期,它接收 KafkaSupervisorSpec 作為輸入,生成對應的 Tasks 並管理這些 Tasks 與 KafkaTopic 分區的映射關係。KafkaSupervisor 內部還包含一個處理線程,它不斷的消費 Notice 隊列的內容並調用對應的 handle 方法。Notice 是 Druid 中對 SupervisorOperation 的抽象表示,例如我們提交,暫停或者重置 Supervisor 時會分別對應 RunNotice,ShutdownNotice 和 ResetNotice。綜上可見,我們要實現實時數據攝入的動態伸縮,本質上是要幹預 Task 任務的生成。實時數據攝入的原理如下圖,其中紅色方框的部分為我們新增的部分,後文中將繼續介紹。

FreeWheel 對 Druid 實時數據攝入彈性伸縮的改進

由於 Druid 攝入 Kafka 數據是否及時可以根據消費 Lag 來直接判斷,因此我們按固定間隔採集 Lag 信息,基於此新增一種 Notice 的實現 DynamicAllocationTaskNotice 並將它插入到 Notice 隊列裡。具體地,在 KafkaSupervisor 中增加記錄 Lag 的定長隊列 CircularFifoQueue,按固定間隔插入採集到的 Lag 信息。當彈性伸縮的調度服務被定時觸發時,它會創建 DynamicAllocationTaskNotice。當 DynamicAllocationTaskNotice 的 handle 方法被調用時,它會判斷定長隊列裡 Lag 的情況,當超過 Scale Up 閾值的點的比例超過設置的閾值,則按指定的步長進行 Scale Up,反之,當小於 Scale Down 閾值的點的比例小於設置的閾值,則按指定的步長進行 Scale Down。同時,通過設定 Task 數量的最大值和最小值,來保障單個數據源佔用的資源在一定範圍內。原理圖如下:

在 Payload 文件中新增 dynamicAllocationTasksProperties 的配置模塊,該特性的具體新增配置屬性如下:

實際運行效果如下圖所示,在美國大選期間的某一時間段,突發一流量高峰,從圖一中可以看到某 Topic 在大約 11:20-13:20 之間流量從每秒不到 100K 條數據增長到最高值超過每秒 286K 條。圖二為該 Topic 對應的 Druid 數據攝入的 Task 數量,如圖中所示,大約在相同時間段,Task 數量從之前的 6 個自動擴展到 10 個,達到設定的最大值,在流量高峰過去後,自動縮回到之前 6 個的水平,整個過程無需工程師幹預。

我們目前實現的實時數據攝入動態伸縮是基於當前 MiddleManager 所能提供的 Task 總容量進行的,可以滿足線上需求。

5

未來計劃

隨著業務規模的不斷擴大,對於狀態需求較高的實時應用,例如支持 Late Data,SessionWindow 等,我們正在考慮為 SOS 支持 Flink 引擎。

整體架構層面,我們正在進行這幾個方面的實踐:構建流批融合的一站式平臺,通過梳理指標規範模型進一步降低用戶的開發成本,最大程度的支持整體平臺的用戶自助服務(Self-Service)。

作者介紹:

韓飛,Lead Software Engineer,FreeWheel。碩士畢業於清華大學軟體學院,目前就職於 FreeWheel 數據基礎架構團隊,任實時數據系統研發負責人。擁有多年數據平臺建設經驗,主要研究領域包括實時計算、實時數倉,OLAP、數據交換等。

今日薦文

相關焦點

  • Freewheel智能輪椅:行動不便人士專屬 還能繪製地形圖
    Freewheel智能輪椅:行動不便人士專屬 還能繪製地形圖   如果一個區域內使用這款Freewheel智能輪椅的人越多,那麼其收集到的數據將會越詳細和準確。
  • LarkXR實踐匯:3DCAT實時渲染雲平臺|Powered by 平行雲
    如今,隨著產品可視化、建築、工程、教育、醫療、遊戲、VR/AR等專業領域在3D交互應用方向的發展,對實時渲染的需求越來越多。由於實時渲染計算量巨大、對時效性要求非常嚴格。因此,把實時渲染算力轉移到雲端是解決這個難題的最佳途徑。實時雲渲染不僅可以提供海量的GPU算力,還實現了終端的輕量化和移動化。
  • 彈性ip是什麼 - CSDN
    *A.簡單接入B.統計簡單C.配置複雜D.多樣管理22.下列哪項不是彈性伸縮服務主要解決的問題?*A.包年B.包月C.按需付費D.免費24.下列關於彈性伸縮服務中,描述正確的是?*A.彈性伸縮服務中的伺服器採用特殊軟性材質生產B.彈性伸縮的收費形式包括按需付費和包年包月兩種C.彈性伸縮是一種可以根據伺服器壓力的不同自動增加或減少實例的服務D.以上皆為錯誤25.關於VPC,下列描述錯誤的是?
  • 真空斷路器的在線實時狀態監測系統
    介紹了真空斷路器的基本結構和工作原理,以現代狀態檢修概念為基礎,利用各種傳感器技術對真空斷路器進行在線工作狀態的數據採集,並對數據進行處理和判斷,實現對真空斷路器的在線實時狀態監測;並設計了一套真空斷路器在線監測系統。
  • 衢州傳力式伸縮接頭質量取勝
    衢州傳力式伸縮接頭質量取勝伸縮接頭成為建築市場主流,根據相關調查數據顯示,2019年僅華南地區新增企業就超過400家;其中,大部分為管道伸縮接頭企業,由此可以看出,市場對管道伸縮接頭的需求量相當之大。儘管過去的一年地產企業遭受了一定的約束,但從實踐客觀的數據不難看出2015年房價大有回暖之勢。以2011年一線城市房價下跌的特徵,當時跌了8個月,而目前一線城市成交量已逐步上升,那麼今年跌幅也極可能沿襲2011年的走勢,若按此歷史邏輯分析,那麼也基本會呈現出跌無可跌的態勢。
  • 康奈爾大學用可伸縮分布式光纖傳感器研製低成本觸覺手套
    來源:映維網 作者 廣州客康奈爾大學的研究人員日前發明了一種結合低成本發光二極體和染料的光纖傳感器,並帶來成一種可以檢測壓力、彎曲和應變等變形的可伸縮「皮膚」。這種傳感器可以用於機器人系統和XR等領域,並為其提供人類在自然界生存所依賴的豐富觸覺感受。
  • Apache Eagle:分布式實時 Hadoop 數據安全方案
    併集成機器學習對用戶行為建立Profile以實時智能地保護Hadoop生態系統中大數據安全的解決方案。Apache Eagle提供一套高效分布式的流式策略引擎,具有高實時、可伸縮、易擴展、交互友好等特點,同時集成機器學習對用戶行為建立Profile以實現實時智能實時地保護Hadoop生態系統中大數據的安全。背景隨著大數據的發展,越來越多的成功企業或者組織開始採取數據驅動商業的運作模式。
  • wheel,tyre和spin用法,spin one's wheel,轉動輪子比喻什麼?
    今天我們來學習wheel,tyre和spin的用法。請熟讀下面生活中常見情景例句到會說。01wheel 輪,車輪(汽車等的)方向盤,(輪船的)舵輪汽車She turned the wheel sharply to the left.她往左猛打方向盤。
  • 績溪模數組合式伸縮縫施工視頻
    因此,需採用設有螺栓彈簧的裝置來固定滑動鋼板,以減少拍擊和噪聲,該伸縮縫的構造相對複雜,就需要使用模數式橋梁伸縮縫。致使伸縮裝置不能正常工作。這樣會出現下列情況:由於縫距太小,橡膠伸縮縫因超限擠壓凸起而產生跳車;由於縫距過大,荷載作用下的剪切力以及車輛行駛的慣性,會將鬆動的伸縮縫橡膠帶出定位角鋼,產生了另一類型的跳車。
  • 基於FPGA的多級小波逆變換實時系統設計
    因此,一種廉價、有效、實時的解決方案,對於JPEG2000的推廣應用較為有利。本文針對JPEG2000解碼系統中核心處理模塊——離散小波逆變換(IDWT),採用提升小波算法,提出了一種雙路並行的實現結構,並基於Xilinx公司低功耗的xc2v3000-4-fg676晶片進行布局布線仿真驗證表明,該方案是一種高速、實時的硬體解決方案,能較好地解決JPEG200 0解碼系統中對於小波逆變換實時處理的瓶頸。
  • 可攜式明渠流量計比對裝置為什麼採用磁致伸縮傳感器?
    為了減輕相關工作人員的工作壓力,並能夠快速準確完成校準工作,提出了明渠流量測量比對採集成套系統。2019年12月24日,發布了《水汙染源在線監測系統(CODCr、NH3-N 等)安裝技術規範》和《水汙染源在線監測系統(CODCr、NH3-N 等)驗收技術規範》。
  • DTCC2020阿里雲李飛飛:雲原生分布式資料庫與數據倉庫系統點亮數據...
    從Gartner、IDC及各個傳統廠商分析中可以得到以下幾個結論:數據在爆炸性增長,非結構化數據的佔比越來越高;生產/處理實時化與智能化的需求越來越高,並追求離在線一體化;資料庫系統、大數據系統、數據管理分析系統等上雲的趨勢明顯
  • 金山雲推出KingStack藍光雲一體機 實現低成本高可靠數據存儲
    大量數據帶來的龐大的存儲成本,成為很多企業一項巨大的硬性支出,而大部分數據都具有非常顯著的生命周期屬性,這些歷史數據往往訪問頻度低、可利用性低,成為冷數據,若能低成本高可靠地將這些冷數據進行存儲,將能夠極大的節省存儲成本。
  • 磁致伸縮位移傳感器名詞解釋
    OFweek電子工程網 磁致伸縮位移傳感器(Magnetostrictive Position Sensor),是基於鐵磁性材料磁致伸縮效應而開發的一種具有特殊優點的位移檢測裝置,具有高可靠性、高解析度、非接觸測量、耐油抗汙等特殊優點,能在惡劣的工業環境下,對各種運動部件的位移(位置)、速度進行連續、精確、實時的檢測
  • 常熟風琴伸縮護罩生產廠家需求知道哪些數據
    常熟風琴伸縮護罩生產廠家需求知道哪些數據 ,「btgpt」   常熟風琴伸縮護罩生產廠家需求知道哪些數據    按裝置方式又可分為整體式、拉鏈式、粘扣式、摁扣式等,為用戶的裝置帶來更大的便利
  • 我問風扇磁致伸縮導波檢測技術你懂麼?它搖了一整夜頭!
    管道中磁致伸縮導波的激發基於正磁致伸縮效應,即鐵磁性材料在外加磁場的作用下,材料表面產生彈性形變,在介質中以導波的形式傳播。相應的,磁致伸縮導波的接收基於逆磁致伸縮效應,質點的振動導致鐵磁性材料產生彈性形變,進而導致其內部磁場發生變化。根據材料的形變方式,管道中的導波分為彎曲模態、扭轉模態和縱向模態。
  • ...雲開發即將發布實時數據推送服務,再也不用自己搭建WebSocket了
    受訪嘉賓 | 周子傑作者 | Yonie小程序雲開發 - 實時數據推送是小程序雲開發即將發布的一個雲服務, 可以監聽雲資料庫的數據變更,實時推送到小程序端。省去了開發者搭建 WebSocket 的成本,是小程序中實時推送的高效實踐方案。
  • 10月中旬起工程勘察數據實時上傳丨未應用勘察數據系統項目不能在...
    為提高我省工程勘察成果質量,規範勘察單位市場行為,近日省住建廳發布《關於啟用黑龍江省工程勘察數據系統的通知》。今後工程勘察單位在開展勘察活動時,利用手機APP對工程項目勘察數據進行影像留存,數據要實時上傳,在完成工程勘察作業後,應將勘察作業成果、勘察報告等信息錄入數據系統。
  • 基於STM32的光功率實時監測系統設計
    光功率檢測儀主要是對光纖中光的強度進行實時監測,是光纜自動檢測和光路切換系統中的關鍵設備,其性能的好壞直接影響系統功能的可靠性和穩定性。設計是基於ST公司Cortex—M3內核的32位ARM產品,以STM32F103ZET6為主晶片設計的一款光功率檢測系統。通過建立TCP連結,實現GPRS數據的傳輸,做到了隨時隨地進行實時監測。同時,系統設計是基於CAN總線的嵌入式功率實時監測系統,不僅能夠智能地監測多個通道的光纖網絡,還具備了其系統的抗幹擾性和可靠性。
  • 實時監測預警系統實施
    (3)實現危險源企業通過網絡進行數據的實時申報,達到安監快速、有效審核,及時管理的要求,同時系統能自動完成重大危險源的分級評估。  (4) 整合安監系統內部資源、深化業務應用,重點解決了數據質量參差不齊,數據錯誤、數據冗餘等問題。