論SparkStreaming的數據可靠性和一致性

2020-12-22 CSDN技術社區

2Driver HA

由於流計算系統是長期運行、且不斷有數據流入,因此其Spark守護進程(Driver)的可靠性至關重要,它決定了Streaming程序能否一直正確地運行下去。

Driver實現HA的解決方案就是將元數據持久化,以便重啟後的狀態恢復。如圖一所示,Driver持久化的元數據包括:

Block元數據(圖1中的綠色箭頭):Receiver從網絡上接收到的數據,組裝成Block後產生的Block元數據;

Checkpoint數據(圖1中的橙色箭頭):包括配置項、DStream操作、未完成的Batch狀態、和生成的RDD數據等;

Driver失敗重啟後:

恢復計算(圖2中的橙色箭頭):使用Checkpoint數據重啟driver,重新構造上下文並重啟接收器。

恢復元數據塊(圖2中的綠色箭頭):恢復Block元數據。

恢復未完成的作業(圖2中的紅色箭頭):使用恢復出來的元數據,再次產生RDD和對應的job,然後提交到Spark集群執行。

通過如上的數據備份和恢復機制,Driver實現了故障後重啟、依然能恢復Streaming任務而不丟失數據,因此提供了系統級的數據高可靠。

可靠的上下遊IO系統

流計算主要通過網絡socket通信來實現與外部IO系統的數據交互。由於網絡通信的不可靠特點,發送端與接收端需要通過一定的協議來保證數據包的接收確認和失敗重發機制。

不是所有的IO系統都支持重發,這至少需要實現數據流的持久化,同時還要實現高吞吐和低時延。在SparkStreaming官方支持的data source裡面,能同時滿足這些要求的只有Kafka,因此在最近的SparkStreaming release裡面,也是把Kafka當成推薦的外部數據系統。

除了把Kafka當成輸入數據源(inbound data source)之外,通常也將其作為輸出數據源(outbound data source)。所有的實時系統都通過Kafka這個MQ來做數據的訂閱和分發,從而實現流數據生產者和消費者的解耦。

一個典型的企業大數據中心數據流向視圖如圖3所示:

除了從源頭保證數據可重發之外,Kafka更是流數據Exact Once語義的重要保障。Kafka提供了一套低級API,使得client可以訪問topic數據流的同時也能訪問其元數據。SparkStreaming每個接收的任務都可以從指定的Kafka topic、partition和offset去獲取數據流,各個任務的數據邊界很清晰,任務失敗後可以重新去接收這部分數據而不會產生「重疊的」數據,因而保證了流數據「有且僅處理一次」。

可靠的接收器

在Spark 1.3版本之前,SparkStreaming是通過啟動專用的Receiver任務來完成從Kafka集群的數據流拉取。

Receiver任務啟動後,會使用Kafka的高級API來創建topicMessageStreams對象,並逐條讀取數據流緩存,每個batchInerval時刻到來時由JobGenerator提交生成一個spark計算任務。

由於Receiver任務存在宕機風險,因此Spark提供了一個高級的可靠接收器-ReliableKafkaReceiver類型來實現可靠的數據收取,它利用了Spark 1.2提供的WAL(Write Ahead Log)功能,把接收到的每一批數據持久化到磁碟後,更新topic-partition的offset信息,再去接收下一批Kafka數據。萬一Receiver失敗,重啟後還能從WAL裡面恢復出已接收的數據,從而避免了Receiver節點宕機造成的數據丟失(以下代碼刪除了細枝末節的邏輯):

啟用WAL後,雖然Receiver的數據可靠性風險降低了,但卻由於磁碟持久化帶來的開銷,系統整體吞吐率會明顯下降。因此,最新發布的Spark 1.3版本,SparkStreaming增加了使用Direct API的方式來實現Kafka數據源的訪問。

引入了Direct API後,SparkStreaming不再啟動常駐的Receiver接收任務,而是直接分配給每個Batch及RDD最新的topic partition offset。job啟動運行後Executor使用Kafka的simple consumer API去獲取那一段offset的數據。

這樣做的好處不僅避免了Receiver宕機帶來數據可靠性的風險,也由於避免使用ZooKeeper做offset跟蹤,而實現了數據的精確一次性(以下代碼刪除了細枝末節的邏輯):

預寫日誌 Write Ahead Log

Spark 1.2開始提供預寫日誌能力,用於Receiver數據及Driver元數據的持久化和故障恢復。WAL之所以能提供持久化能力,是因為它利用了可靠的HDFS做數據存儲。

SparkStreaming預寫日誌機制的核心API包括:

管理WAL文件的WriteAheadLogManager

讀/寫WAL的WriteAheadLogWriter和WriteAheadLogReader

基於WAL的RDD:WriteAheadLogBackedBlockRDD

基於WAL的Partition:WriteAheadLogBackedBlockRDDPartition

以上核心API在數據接收和恢復階段的交互示意圖如圖4所示。

從WriteAheadLogWriter的源碼裡可以清楚看到,每次寫入一塊數據buffer到HDFS後都會調用flush方法去強制刷入磁碟,然後才去取下一塊數據。因此receiver接收的數據是可以保證持久化到磁碟了,因而做到較好的數據可靠性。

結束語

得益於Kafka這類可靠的data source以及自身的checkpoint/WAL等機制,SparkStreaming的數據可靠性得到了很好的保證,數據能保證「至少一次」(at least once)被處理。但由於其outbound端的一致性實現還未完善,因此Exact once語義仍然不能端到端保證。SparkStreaming社區已經在跟進這個特性的實現(SPARK-4122),預計很快將合入trunk發布。


作者簡介:葉琪,華為軟體公司Universe產品部高級架構師,專注於大數據底層分布式存儲和計算基礎設施,是華為軟體公司Hadoop發行版的主要架構師,目前興趣點在流計算與Spark。

 

本文選自程式設計師電子版2015年6月A刊,該期更多文章請查看這裡。2000年創刊至今所有文章目錄請查看程式設計師封面秀。歡迎訂閱程式設計師電子版(含iPad版、Android版、PDF版)。

本文為程式設計師雜誌原創文章,未經允許不得轉載,如需轉載請聯繫market#csdn.net(#換成@)

相關焦點

  • 是時候放棄Spark Streaming,轉向Structured Streaming了
    DStream 只能保證自己的一致性語義是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Streaming 輸出到外部存儲的語義往往需要用戶自己來保證。
  • spark streaming流處理入門乾貨,傾力奉獻
    批處理和流處理是大數據處理最常見的兩個場景。那麼作為當下最流行的大數據處理平臺之一,Spark當然也支持流數據處理。02spark streaming概述Spark Streaming 提供一個對於流數據的抽象 DStream。
  • Spark Streaming 和 Flink 誰是數據開發者的最愛?
    Flink 與 Kafka 結合是事件驅動,大家可能對此會有疑問,消費 Kafka 的數據調用 Poll 的時候是批量獲取數據的(可以設置批處理大小和超時時間),這就不能叫做事件觸發了。而實際上,Flink 內部對 Poll 出來的數據進行了整理,然後逐條 emit,形成了事件觸發的機制。
  • 大數據入門:Spark Streaming實際應用
    二、Sparkstreaming應用場景基於Spark Streaming優秀的性能表現,在很多的企業級應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等,都能夠給出合理的解決方案。
  • Spark實戰第二版(涵蓋Spark3.0)-第十章 通過結構化流接入數據
    本章涵蓋從總覽的角度看你的數據,專注於數據生成部分。您看到的是生成批量數據的系統,還是連續生成數據的系統?提供數據流(也稱為streams)的系統幾年前不太流行。streams肯定會得到更多的關注,理解streams是本章的重點。例如,您的手機會定期ping手機發射塔。如果是智慧型手機(本書的讀者群,很有可能),它將同時檢查電子郵件和其他內容。
  • 『 Spark 』13. Spark 2.0 Release Notes 中文版
    寫這樣一個系列僅僅是為了梳理個人學習spark的筆記記錄,所以一切以能夠理解為主,沒有必要的細節就不會記錄了,而且文中有時候會出現英文原版文檔,只要不影響理解,都不翻譯了。若想深入了解,最好閱讀參考文章和官方文檔。其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本號還是必要的。
  • 【大數據嗶嗶集20210117】Spark面試題靈魂40問
    ; 5)checkpoint和persist,數據計算之後持久化緩存; 6)數據調度彈性,DAG TASK調度和資源無關; 1)不支持細粒度的寫和更新操作(如網絡爬蟲),spark寫數據是粗粒度的。spark並行度,每個core承載24個partition,如,32個core,那麼64128之間的並行度,也就是設置64~128個partion,並行讀和數據規模無關, 只和內存使用量和cpu使用時間有關。29、collect功能是什麼,其底層是怎麼實現的?
  • 走進大數據丨 公安大數據犯罪嫌疑人實時過濾
    隨著整體大數據時代的發展,大數據已經不光是在電商領域發展了, 已經開始進軍智慧政務領域那麼今天就帶著同學們學習一下人工智慧攝像頭智能捕捉犯罪嫌疑人的實戰 package;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext
  • 大數據分析工程師入門9-Spark SQL
    早期Spark的切入點是SparkContext,通過它來創建和操作數據集,對於不同的API需要不同的context。比如:使用sql-需要sqlContext,使用hive-需要hiveContext,使用streaming-需要StreamingContext。
  • 數據說話:大數據處理引擎Spark與Flink比拼
    Flink 使用異步的 checkpoint 機制來達到任務狀態的可恢復性,以保證處理的一致性,所以在處理的主流程上可以做到數據源和輸出之間數據完全不用落盤,達到更高的性能和更低的延遲。  數據處理場景  除了批處理之外,Spark 還支持實時數據流處理、交互式查詢和機器學習、圖計算等。
  • 《Spark編程基礎(Python版)》
    教材官網為 http://dblab.xmu.edu.cn/post/spark-python/ 。認識林子雨老師是被他的數字教師和實驗室主頁震撼,他幾乎事無巨細地記錄了工作學習中的點點滴滴,這還是很少見的。數字教師:基礎數據+對外智能服務。
  • spark結構化數據處理:Spark SQL、DataFrame和Dataset
    概述相比於Spark RDD API,Spark SQL包含了對結構化數據和在其上的運算的更多信息,Spark SQL使用這些信息進行了額外的優化,使對結構化數據的操作更加高效和方便。DataFrame接口可以處理多種數據源,Spark SQL也內建支持了若干種極有用的數據源格式(包括json、parquet和jdbc,其中parquet是默認格式)。此外,當你使用SQL查詢這些數據源中的數據並且只用到了一部分欄位時,Spark SQL可以智能地只掃描這些用到的欄位。
  • Structured Streaming與Flink比較
    Streaming都是基於微批處理的,不過現在Spark Streaming已經非常穩定基本都沒有更新了,然後重點移到spark sql和structured Streaming了。對於基於事件時間的處理flink和Structured Streaming都是支持watemark機制,窗口操作基於watermark和事件時間可以對滯後事件做相應的處理,雖然聽起來這是個好事,但是整體來說watermark就是雞肋,它會導致結果數據輸出滯後,比如watermark是一個小時,窗口一個小時,那麼數據輸出實際上會延遲兩個小時,這個時候需要進行一些處理。
  • 2020大數據面試題真題總結(附答案)
    34.Sparkstreaming和flink做實時處理的區別35.Sparkcontext的作用36.Sparkstreaming讀取kafka數據為什麼選擇直連方式37.離線分析什麼時候用sparkcore和sparksql38.Sparkstreaming實時的數據不丟失的問題39.簡述寬依賴和窄依賴概念,groupByKey
  • 一、Spark概述
    mapreduce中map和reduce任務都是以進程的方式運行著,而spark中的job是以線程方式運行在進程中。2、易用性(可以通過java/scala/python/R開發spark應用程式)3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)4、兼容性(spark程序可以運行在standalone/yarn/mesos)Spark 框架模塊整個Spark 框架模塊包含
  • 代碼 | Spark讀取mongoDB數據寫入Hive普通表和分區表
    System.out.println("數據插入到Hive ordinary table");            Long t1 = System.currentTimeMillis();            spark.sql("insert into mgtohive_2 " + querysql + " " + "where b.id not in (select id
  • 圖解SparkStreaming與Kafka的整合,細節滿滿
    前言老劉是一名即將找工作的研二學生,寫博客一方面是複習總結大數據開發的知識點,一方面是希望幫助更多自學的小夥伴。由於老劉是自學大數據開發,肯定會存在一些不足,還希望大家能夠批評指正,讓我們一起進步!我們要知道Spark作為實時計算框架,它僅僅涉及到計算,並沒有涉及到數據的存儲,所以我們後期需要使用spark對接外部的數據源。
  • 數據分析工程師面試集錦5——Spark面試指南
    導語本篇文章為大家帶來spark面試指南,文內會有兩種題型,問答題和代碼題,題目大部分來自於網絡上,有小部分是來自於工作中的總結,每個題目會給出一個參考答案。為什麼考察Spark?Spark作為大數據組件中的執行引擎,具備以下優勢特性。
  • Spark 數據傾斜及其解決方案
    TIPS在 Spark streaming 程序中,數據傾斜更容易出現,特別是在程序中包含一些類似 sql 的 join、group 這種操作的時候。因為 Spark Streaming 程序在運行的時候,我們一般不會分配特別多的內存,因此一旦在這個過程中出現一些數據傾斜,就十分容易造成 OOM。
  • 如何將 MapReduce 轉化為 Spark
    ;基於實時數據流的數據處理(streaming data processing),通常的時間跨度在數百毫秒到數秒之間。但是對於其他類型的計算,比如交互式和流式計算,MapReduce 並不適合。這也導致了大量的不同於 MapReduce 的專有的數據處理模型的出現,比如 Storm、Impala 等等。