論SparkStreaming的數據可靠性和一致性

2021-01-09 CSDN技術社區

2Driver HA

由於流計算系統是長期運行、且不斷有數據流入,因此其Spark守護進程(Driver)的可靠性至關重要,它決定了Streaming程序能否一直正確地運行下去。

Driver實現HA的解決方案就是將元數據持久化,以便重啟後的狀態恢復。如圖一所示,Driver持久化的元數據包括:

Block元數據(圖1中的綠色箭頭):Receiver從網絡上接收到的數據,組裝成Block後產生的Block元數據;

Checkpoint數據(圖1中的橙色箭頭):包括配置項、DStream操作、未完成的Batch狀態、和生成的RDD數據等;

Driver失敗重啟後:

恢復計算(圖2中的橙色箭頭):使用Checkpoint數據重啟driver,重新構造上下文並重啟接收器。

恢復元數據塊(圖2中的綠色箭頭):恢復Block元數據。

恢復未完成的作業(圖2中的紅色箭頭):使用恢復出來的元數據,再次產生RDD和對應的job,然後提交到Spark集群執行。

通過如上的數據備份和恢復機制,Driver實現了故障後重啟、依然能恢復Streaming任務而不丟失數據,因此提供了系統級的數據高可靠。

可靠的上下遊IO系統

流計算主要通過網絡socket通信來實現與外部IO系統的數據交互。由於網絡通信的不可靠特點,發送端與接收端需要通過一定的協議來保證數據包的接收確認和失敗重發機制。

不是所有的IO系統都支持重發,這至少需要實現數據流的持久化,同時還要實現高吞吐和低時延。在SparkStreaming官方支持的data source裡面,能同時滿足這些要求的只有Kafka,因此在最近的SparkStreaming release裡面,也是把Kafka當成推薦的外部數據系統。

除了把Kafka當成輸入數據源(inbound data source)之外,通常也將其作為輸出數據源(outbound data source)。所有的實時系統都通過Kafka這個MQ來做數據的訂閱和分發,從而實現流數據生產者和消費者的解耦。

一個典型的企業大數據中心數據流向視圖如圖3所示:

除了從源頭保證數據可重發之外,Kafka更是流數據Exact Once語義的重要保障。Kafka提供了一套低級API,使得client可以訪問topic數據流的同時也能訪問其元數據。SparkStreaming每個接收的任務都可以從指定的Kafka topic、partition和offset去獲取數據流,各個任務的數據邊界很清晰,任務失敗後可以重新去接收這部分數據而不會產生「重疊的」數據,因而保證了流數據「有且僅處理一次」。

可靠的接收器

在Spark 1.3版本之前,SparkStreaming是通過啟動專用的Receiver任務來完成從Kafka集群的數據流拉取。

Receiver任務啟動後,會使用Kafka的高級API來創建topicMessageStreams對象,並逐條讀取數據流緩存,每個batchInerval時刻到來時由JobGenerator提交生成一個spark計算任務。

由於Receiver任務存在宕機風險,因此Spark提供了一個高級的可靠接收器-ReliableKafkaReceiver類型來實現可靠的數據收取,它利用了Spark 1.2提供的WAL(Write Ahead Log)功能,把接收到的每一批數據持久化到磁碟後,更新topic-partition的offset信息,再去接收下一批Kafka數據。萬一Receiver失敗,重啟後還能從WAL裡面恢復出已接收的數據,從而避免了Receiver節點宕機造成的數據丟失(以下代碼刪除了細枝末節的邏輯):

啟用WAL後,雖然Receiver的數據可靠性風險降低了,但卻由於磁碟持久化帶來的開銷,系統整體吞吐率會明顯下降。因此,最新發布的Spark 1.3版本,SparkStreaming增加了使用Direct API的方式來實現Kafka數據源的訪問。

引入了Direct API後,SparkStreaming不再啟動常駐的Receiver接收任務,而是直接分配給每個Batch及RDD最新的topic partition offset。job啟動運行後Executor使用Kafka的simple consumer API去獲取那一段offset的數據。

這樣做的好處不僅避免了Receiver宕機帶來數據可靠性的風險,也由於避免使用ZooKeeper做offset跟蹤,而實現了數據的精確一次性(以下代碼刪除了細枝末節的邏輯):

預寫日誌 Write Ahead Log

Spark 1.2開始提供預寫日誌能力,用於Receiver數據及Driver元數據的持久化和故障恢復。WAL之所以能提供持久化能力,是因為它利用了可靠的HDFS做數據存儲。

SparkStreaming預寫日誌機制的核心API包括:

管理WAL文件的WriteAheadLogManager

讀/寫WAL的WriteAheadLogWriter和WriteAheadLogReader

基於WAL的RDD:WriteAheadLogBackedBlockRDD

基於WAL的Partition:WriteAheadLogBackedBlockRDDPartition

以上核心API在數據接收和恢復階段的交互示意圖如圖4所示。

從WriteAheadLogWriter的源碼裡可以清楚看到,每次寫入一塊數據buffer到HDFS後都會調用flush方法去強制刷入磁碟,然後才去取下一塊數據。因此receiver接收的數據是可以保證持久化到磁碟了,因而做到較好的數據可靠性。

結束語

得益於Kafka這類可靠的data source以及自身的checkpoint/WAL等機制,SparkStreaming的數據可靠性得到了很好的保證,數據能保證「至少一次」(at least once)被處理。但由於其outbound端的一致性實現還未完善,因此Exact once語義仍然不能端到端保證。SparkStreaming社區已經在跟進這個特性的實現(SPARK-4122),預計很快將合入trunk發布。


作者簡介:葉琪,華為軟體公司Universe產品部高級架構師,專注於大數據底層分布式存儲和計算基礎設施,是華為軟體公司Hadoop發行版的主要架構師,目前興趣點在流計算與Spark。

 

本文選自程式設計師電子版2015年6月A刊,該期更多文章請查看這裡。2000年創刊至今所有文章目錄請查看程式設計師封面秀。歡迎訂閱程式設計師電子版(含iPad版、Android版、PDF版)。

本文為程式設計師雜誌原創文章,未經允許不得轉載,如需轉載請聯繫market#csdn.net(#換成@)

相關焦點

  • Spark Streaming:大規模流式數據處理的新貴
    基於歷史數據的交互式查詢(interactive query),通常的時間跨度在數十秒到數分鐘之間。 基於實時數據流的數據處理(streaming data processing),通常的時間跨度在數百毫秒到數秒之間。
  • 深入對比數據科學工具箱: SparkR vs Sparklyr
    Sparklyr 由 RStudio 社區維護,通過深度集成 RStudio 的方式,提供更易於擴展和使用的方法,更強調統計特性與機器學習,實現本地與分布式代碼的一致性,通常會比 SparkR 延遲1-2個版本,從使用上看接近於dplyr。由於 SparkR 與 Sparklyr 都是 Spark API 的封裝,故二者在計算性能上沒有顯著差異。
  • 大數據分析工程師入門9-Spark SQL
    早期Spark的切入點是SparkContext,通過它來創建和操作數據集,對於不同的API需要不同的context。比如:使用sql-需要sqlContext,使用hive-需要hiveContext,使用streaming-需要StreamingContext。
  • Talking Data 閻志濤:流式大數據和即時交互分析技術
    流式計算數據是流動的,原來我做大數據的時候是在做很多的IT業務系統,那時候的數據是怎麼做的,一般的情況下ROTB是數據入庫,是為了保障交易的一致性,同時會有LOG產生,LOG到磁碟,LOG到磁碟之後是為了做事後分析,所以那個時候一般的有什麼數倉,也就是說先有ROT就有了RVP,RVP是為了做報告,運行時的數據產生存到磁碟,為日後的消費去做業務洞察去做準備,這是過去一種方式。
  • 數據分析工程師面試集錦5——Spark面試指南
    導語本篇文章為大家帶來spark面試指南,文內會有兩種題型,問答題和代碼題,題目大部分來自於網絡上,有小部分是來自於工作中的總結,每個題目會給出一個參考答案。為什麼考察Spark?Spark作為大數據組件中的執行引擎,具備以下優勢特性。
  • 【大數據】最新大數據學習路線(完整詳細版】
    ,redis)Spark(scala,spark,spark core,spark sql,spark streaming,spark mllib,spark graphx)Python(python,spark python)?
  • 大數據掃盲——什麼是spark
    關於大數據技術之前的文章裡已經提到了HDFS和MapReduce。HDFS解決了大數據的存儲問題,MapReduce解決了大數據的運算問題。既能存儲又能運算,貌似這樣已經很完美了。spark的出現就彌補了MapReduce的不足。 spark是一種基於內存的快速、通用、可擴展的大數據計算引擎。它集批處理、實時流處理、交互式查詢、圖計算與機器學習於一體Spark應用場景批處理可用於ETL(抽取、轉換、加載)。 機器學習可用於自動判斷淘寶的買家評論是好評還是差評。 交互式分析可用於查詢Hive數據倉庫。
  • Spark在360商業數據部的應用實踐
    由於之前大部分數據分析工作都是通過使用hive命令行完成的,為了將遷移至SparkSQL的代價最小,360系統部的同事開發了SparkSQL的命令行版本spark-hive。原有的以hive 命令運行的腳本,簡單的改成spark-hive便可以運行。360系統部的同事也做了大量兼容性的工作。spark-hive目前已經比較穩定,成為數據分析的首選。
  • 為什麼說,大數據是從流式計算開始切入的?
    流計算平臺的發展歷程2014年到現在4年多的發展歷程,經歷storm->spark streaming->flink的轉變,目前在轉變中。2. storm及spark streaming的缺點&我們為什麼選擇flink?
  • 每個數據科學家都得會一點SparkMagic
    商業數據科學家80%的時間都花在查找、清洗和準備數據上,這是數據科學家工作中效率最低也是最可怕的部分。網際網路為如何打破數據科學的80/20定律提供了許多的意見,但卻收效甚微。其實,數據科學家生產率低下的主要原因在於數據準備工作的雙重性:· 快速訪問、合併和聚合存儲在企業數據湖中的大數據· 探索和可視化數據中具有複雜依賴關係的Python數據包中的數據和統計信息大數據大多是非結構化的,常常存儲在具有企業管理和安全限制的生產環境中。
  • 基於Bert和通用句子編碼的Spark-NLP文本分類
    類別取決於所選的數據集,並且可以從主題開始。每一個文本分類問題都遵循相似的步驟,並用不同的算法來解決。更不用說經典和流行的機器學習分類器,如隨機森林或Logistic回歸,有150多個深度學習框架提出了各種文本分類問題。文本分類問題中使用了幾個基準數據集,可以在nlpprogress.com上跟蹤最新的基準。以下是關於這些數據集的基本統計數據。
  • 手把手教你在本機安裝spark
    本文轉載自【微信公眾號:五角錢的程式設計師,ID:xianglin965】,經微信公眾號授權轉載,如需轉載與原文作者聯繫今天是spark系列的第一篇文章。最近由於一直work from home節省了很多上下班路上的時間,加上今天的LeetCode的文章篇幅較小,所以抽出了點時間加更了一篇,和大家分享一下最近在學習的spark相關的內容。
  • 【瘋狂寫作06 材料篇】 論說文之多樣性和一致性
    同學們  一起練寫作啊  今日話題—論說文  根據以下材料,自擬題目,寫一篇700 字左右的論說文。  材料  亞里斯多德說:「城邦的本質在於多樣性,而不在於一致性……無論是家庭還是城邦,它們內部都有著一定的一致性。不然的話,它們是不可能組建起來的。  但這種一致性是有一定限度的……同一種聲音無法實現和諧,同一個音階也無法組成旋律。城邦也是如此,它是一個多面體。
  • 停止使用Pandas並開始使用Spark+Scala
    為什麼數據科學家和工程師應該考慮將Spark與Scala結合使用以替代Pandas,以及如何入門    以數據工程師的經驗,我發現在Pandas中建立數據管道經常需要我們定期增加資源,以跟上不斷增加的內存使用量。 此外,由於意外的數據類型或空值,我們經常會看到許多運行時錯誤。 通過將Spark與Scala結合使用,解決方案感覺更強大,重構和擴展更容易。
  • 5G和北鬥結合提高測量可靠性
    同2005年相比,此次珠峰高程測量的科學性、可靠性、創新性明顯提高,主要體現在7個方面。將我國自主研製、擁有完全自主智慧財產權的北鬥衛星導航系統首次應用於珠峰峰頂大地高的計算,獲取了更長觀測時間、更多衛星觀測數據。
  • SparkCore——專業術語及流程圖
    1,Applicationapplication(應用)其實就是用spark-submit提交的程序。比方說spark examples中的計算pi的SparkPi。一個application通常包含三部分:從數據源(比方說HDFS)取數據形成RDD,通過RDD的transformation和action進行計算,將結果輸出到console或者外部存儲(比方說collect收集輸出到console)。2,DriverSpark中的driver感覺其實和yarn中Application Master的功能相類似。
  • sparksql 窗口函數原理
    一、窗口函數是啥在單表數據操作中,一般有下面兩種操作範式:針對單條數據的映射操作,例如每條數據加一的時候。將數據分組後的聚合操作,例如進行分組統計的時候。在第一種範式中有這樣一種情況,當你要生成某條目標數據的時候你需要用到前後N條數據參與計算。
  • 什麼是一致性哈希算法
    ---數據分片與路由當數據量很大時,通過改善單機硬體資源的縱向擴充方式來存儲數據變得越來越不適用,而通過增加機器數目來獲得水平橫向擴展的方式則越來越流行。因此,就有個問題,如何將這些海量的數據分配到各個機器中?數據分布到各個機器存儲之後,又如何進行查找?這裡主要記錄一致性Hash算法如何將數據分配到各個機器中去。
  • 聊聊分布式系統的數據一致性
    數據多副本間的一致性存儲系統是千差萬別的,可以拿來存放視頻這種動輒幾個G的大文件,也可以存放幾KB的KV鍵值對數據,還可能是MySQL這種關係型的資料庫。雖然上層數據結構千差萬別,在保證數據多副本一致性方面,都逃不出一個基本方式,狀態機複製(state machine replication)。
  • 數據圖表可視化的配色一致性原則
    在論證中,顏色的使用應該基於數據,而不是個人偏好或品牌顏色。接下來我們使用數據分析系統DataFocus所製作的圖表來進行相應的演示。通常,我們在使用顏色時可以遵循以下顏色一致性原則:1.數字指標的一致性當基於某個指標的數值執行顏色映射時,建議使用兩極顏色生長模式的漸變顏色。例如,統計不同地區的銷售情況。左側圖像的顏色沒有顏色系統和生長規律。用戶很難理解特定索引值的含義。此時,如果使用右側的生長顏色系統。