是時候放棄Spark Streaming,轉向Structured Streaming了

2021-02-19 AI前線
來源 | 授權轉載自微信公眾號 legendtkl AI 前線導讀:Spark 團隊對 Spark Streaming 的維護將會越來越少,Spark 2.4 版本的 Release Note 裡面甚至一個 Spark Streaming 相關的 ticket 都沒有。相比之下,Structured Streaming 有將近十個 ticket 說明。所以,是時候捨棄 Spark Streaming 轉向 Structured Streaming 了,當然理由並不止於此。

今天這篇文章將重點分析 Spark Streaming 的不足,以及 Structured Streaming 的設計初衷和思想。文章主要參考 2018 年 Sigmod 上面的論文:《Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark》。

更多優質內容請關注微信公眾號「AI 前線」(ID:ai-front)

首先可以注意到論文標題中的 Declarative API,中文一般叫做聲明式編程 API。一般直接看到這個詞可能不知道什麼意思,但是當我們列出它的對立單詞:Imperative API,中文一般叫命令式編程 API,仿佛一切都明了了。是的,沒錯,Declarative 只是表達出我們想要什麼,而 Imperative 則是說為了得到什麼我們需要做哪些東西一個個說明。舉個例子,我們要一個糕點,去糕點店直接定做,告訴店員我們要什麼樣式的糕點,然後店員去給我們做出來,這就是 Declarative。而 Imperative 對應的就是麵粉店了。

在開始正式介紹 Structured Streaming 之前有一個問題還需要說清楚,就是 Spark Streaming 存在哪些不足?總結一下主要有下面幾點:

使用 Processing Time 而不是 Event Time

首先解釋一下,Processing Time 是數據到達 Spark 被處理的時間,而 Event Time 是數據自帶的屬性,一般表示數據產生於數據源的時間。比如 IoT 中,傳感器在 12:00:00 產生一條數據,然後在 12:00:05 數據傳送到 Spark,那麼 Event Time 就是 12:00:00,而 Processing Time 就是 12:00:05。我們知道 Spark Streaming 是基於 DStream 模型的 micro-batch 模式,簡單來說就是將一個微小時間段,比如說 1s 的流數據當成批數據來處理。如果我們要統計某個時間段的一些數據,毫無疑問應該使用 Event Time,但是因為 Spark Streaming 的數據切割是基於 Processing Time,這樣就導致使用 Event Time 特別困難。

這點比較好理解,DStream (Spark Streaming 的數據模型)提供的 API 類似 RDD 的 API,非常 Low level。當我們編寫 Spark Streaming 程序的時候,本質上就是要去構造 RDD 的 DAG 執行圖,然後通過 Spark Engine 運行。這樣導致的一個問題是,DAG 可能會因為開發者的水平參差不齊而導致執行效率上的天壤之別。這樣導致開發者的體驗非常不好,也是任何一個基礎框架不想看到的(基礎框架的口號一般都是:你們專注於自己的業務邏輯就好,其他的交給我)。這也是很多基礎系統強調 Declarative 的一個原因。

Reason about end-to-end application

這裡的 end-to-end 指的是直接 input 到 out,比如 Kafka 接入 Spark Streaming 然後再導出到 HDFS 中。DStream 只能保證自己的一致性語義是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Streaming 輸出到外部存儲的語義往往需要用戶自己來保證。而這個語義保證寫起來也非常有挑戰性,比如為了保證 output 的語義是 exactly-once,需要 output 的存儲系統具有冪等的特性,或者支持事務性寫入,這對於開發者來說都不是一件容易的事情。

儘管批流本是兩套系統,但是這兩套系統統一起來確實很有必要,我們有時候確實需要將我們的流處理邏輯運行到批數據上面。關於這一點,最早在 2014 年 Google 提出 Dataflow 計算服務的時候就批判了 streaming/batch 這種叫法,而是提出了 unbounded/bounded data 的說法。DStream 儘管是對 RDD 的封裝,但是我們要將 DStream 代碼完全轉換成 RDD 還是有一點工作量的,更何況現在 Spark 的批處理都用 DataSet/DataFrame API 了。

1. Structured Streaming 介紹

Structured Streaming 在 Spark 2.0 版本於 2016 年引入,設計思想參考了很多其他系統的思想,比如區分 processing time 和 event time,使用 relational 執行引擎提高性能等。同時也考慮了和 Spark 其他組件更好的集成。

Structured Streaming 和其他系統的顯著區別主要如下:

Incremental query model:Structured Streaming 將會在新增的流式數據上不斷執行增量查詢,同時代碼的寫法和批處理 API (基於 Dataframe 和 Dataset API)完全一樣,而且這些 API 非常的簡單。

Support for end-to-end application:Structured Streaming 和內置的 connector 使的 end-to-end 程序寫起來非常的簡單,而且 "correct by default"。數據源和 sink 滿足 "exactly-once" 語義,這樣我們就可以在此基礎上更好地和外部系統集成。

復用 Spark SQL 執行引擎:我們知道 Spark SQL 執行引擎做了非常多優化工作,比如執行計劃優化、codegen、內存管理等。這也是 Structured Streaming 取得高性能和高吞吐的一個原因。

2. Structured Streaming 核心設計

下面我們看一下 Structured Streaming 的核心設計。

Input and Output:Structured Streaming 內置了很多 connector 來保證 input 數據源和 output sink 保證 exactly-once 語義。

而實現 exactly-once 語義的前提是:

Input 數據源必須是可以 replay 的,比如 Kafka,這樣節點 crash 的時候就可以重新讀取 input 數據。常見的數據源包括 Amazon Kinesis, Apache Kafka 和文件系統。

Output sink 必須要支持寫入是冪等的。這個很好理解,如果 output 不支持冪等寫入,那麼一致性語義就是 at-least-once 了。另外對於某些 sink, Structured Streaming 還提供了原子寫入來保證 exactly-once 語義。

API:Structured Streaming 代碼編寫完全復用 Spark SQL 的 batch API,也就是對一個或者多個 stream 或者 table 進行 query。query 的結果是 result table,可以以多種不同的模式(append, update, complete)輸出到外部存儲中。另外,Structured Streaming 還提供了一些 Streaming 處理特有的 API:Trigger, watermark, stateful operator。

Execution:復用 Spark SQL 的執行引擎。Structured Streaming 默認使用類似 Spark Streaming 的 micro-batch 模式,有很多好處,比如動態負載均衡、再擴展、錯誤恢復以及 straggler (straggler 指的是哪些執行明顯慢於其他 task 的 task)重試。除了 micro-batch 模式,Structured Streaming 還提供了基於傳統的 long-running operator 的 continuous 處理模式。

Operational Features:利用 wal 和狀態存儲,開發者可以做到集中形式的 Rollback 和錯誤恢復。還有一些其他 Operational 上的 feature,這裡就不細說了。

3. Structured Streaming 編程模型

可能是受到 Google Dataflow 的批流統一的思想的影響,Structured Streaming 將流式數據當成一個不斷增長的 table,然後使用和批處理同一套 API,都是基於 DataSet/DataFrame 的。如下圖所示,通過將流式數據理解成一張不斷增長的表,從而就可以像操作批的靜態數據一樣來操作流數據了。

在這個模型中,主要存在下面幾個組成部分:

Input Unbounded Table:流式數據的抽象表示

Query:對 input table 的增量式查詢

Result Table:Query 產生的結果表

Output:Result Table 的輸出

下面舉一個具體的例子,NetworkWordCount,代碼如下:

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()

代碼實際執行流程可以用下圖來表示。把流式數據當成一張不斷增長的 table,也就是圖中的 Unbounded table of all input。然後每秒 trigger 一次,在 trigger 的時候將 query 應用到 input table 中新增的數據上,有時候還需要和之前的靜態數據一起組合成結果。query 產生的結果成為 Result Table,我們可以選擇將 Result Table 輸出到外部存儲。輸出模式有三種:

Complete mode:Result Table 全量輸出

Append mode (default):只有 Result Table 中新增的行才會被輸出,所謂新增是指自上一次 trigger 的時候。因為只是輸出新增的行,所以如果老數據有改動就不適合使用這種模式。

Update mode:只要更新的 Row 都會被輸出,相當於 Append mode 的加強版。

和 batch 模式相比,streaming 模式還提供了一些特有的算子操作,比如 window、watermark、stateful operator 等。

window,下圖是一個基於 event-time 統計 window 內事件的例子。

import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window("eventTime", "10 minutes", "5 minutes"),
$"word"
).count()

如下圖所示,窗口大小為 10 分鐘,每 5 分鐘 trigger 一次。在 12:11 時候收到了一條 12:04 的數據,也就是 late data (什麼叫 late data 呢?就是 Processing Time 比 Event Time 要晚),然後去更新其對應的 Result Table 的記錄。

watermark,是也為了處理 ,很多情況下對於這種 late data 的時效數據並沒有必要一直保留太久。比如說,數據晚了 10 分鐘或者還有點有,但是晚了 1 個小時就沒有用了,另外這樣設計還有一個好處就是中間狀態沒有必要維護那麼多。watermark 的形式化定義為 max(eventTime) - threshold,早於 watermark 的數據直接丟棄。

import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("eventTime", "10 minutes")
.groupBy(
window("eventTime", "10 minutes", "5 minutes"),
$"word")
.count()

用下圖表示更加形象。在 12:15 trigger 時 watermark 為 12:14 - 10m = 12:04,所以 late date (12:08, dog; 12:13, owl) 都被接收了。在 12:20 trigger 時 watermark 為 12:21 - 10m = 12:11,所以 late data (12:04, donkey) 都丟棄了。

除此之後 Structured Streaming 還提供了用戶可以自定義狀態計算邏輯的算子:

mapGroupsWithState

flatMapGroupsWithState

看名字大概也能看出來 mapGroupsWithState 是 one -> one,flatMapGroupsWithState 是 one -> multi。這兩個算子的底層都是基於 Spark Streaming 的 updateStateByKey。

4. Continuous Processing Mode

好,終於要介紹到「真正」的流處理了,我之所以說「真正」是因為 continuous mode 是傳統的流處理模式,通過運行一個 long-running 的 operator 用來處理數據。之前 Spark 是基於 micro-batch 模式的,就被很多人詬病不是「真正的」流式處理。continuous mode 這種處理模式只要一有數據可用就會進行處理,如下圖所示。epoch 是 input 中數據被發送給 operator 處理的最小單位,在處理過程中,epoch 的 offset 會被記錄到 wal 中。另外 continuous 模式下的 snapshot 存儲使用的一致性算法是 Chandy-Lamport 算法。

這種模式相比與 micro-batch 模式缺點和優點都很明顯。

關於為什麼延遲更低,下面兩幅圖可以做到一目了然。

對於 Structured Streaming 來說,因為有兩種模式,所以我們分開討論。

micro-batch 模式 可以提供 end-to-end 的 exactly-once 語義。原因是因為在 input 端和 output  端都做了很多工作來進行保證,比如 input 端 replayable + wal,output 端寫入冪等。

continuous 模式 只能提供 at-least-once 語義。關於 continuous mode 的官方討論的實在太少,甚至只是提了一下。在和 @李呈祥 討論之後覺得應該還是 continuous mode 由於要儘可能保證低延遲,所以在 sink 端沒有做一致性保證。

Structured Streaming 的官方論文裡面給出了 Yahoo! Streaming Benchmark 的結果,Structured Streaming 的 throughput 大概是 Flink 的 2 倍和 Kafka Streaming 的 90 多倍。

總結一下,Structured Streaming 通過提供一套 high-level 的 declarative API 使得流式計算的編寫相比 Spark Streaming 簡單容易不少,同時通過提供 end-to-end 的 exactly-once 語義。

最後,閒扯一點別的。Spark 在多年前推出基於 micro-batch 模式的 Spark Streaming 必然是基於當時 Spark Engine 最快的方式,儘管不是真正的流處理,但是在吞吐量更重要的年代,還是嘗盡了甜頭。而 Spark 的真正基於 continuous 處理模式的 Structured Streaming 直到 Spark 2.3 版本才真正推出,從而導致近兩年讓 Flink 嘗盡了甜頭(當然和 Flink 的優秀的語義模型存在很大的關係)。

在實時計算領域,目前來看,兩家的方向都是朝著 Google DataFlow 的方向。由 Spark 的卓越核心 SQL Engine 助力的 Structured Streaming,還是風頭正勁的 Flink,亦或是其他流處理引擎,究竟誰將佔領統治地位,還是值得期待一下的。

Zaharia M, Das T, Li H, et al. Discretized streams: Fault-tolerant streaming computation at scale[C]//Proceedings of the Twenty-Fourth ACM Symposium on Operating Systems Principles. ACM, 2013: 423-438.

Akidau T, Bradshaw R, Chambers C, et al. The dataflow model: a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing[J]. Proceedings of the VLDB Endowment, 2015, 8(12): 1792-1803.

Armbrust M, Das T, Torres J, et al. Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark[C]//Proceedings of the 2018 International Conference on Management of Data. ACM, 2018: 601-613.

The world beyond batch: Streaming 101

The world beyond batch: Streaming 102

Streaming Systems

https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html

https://en.wikipedia.org/wiki/Chandy-Lamport_algorithm

A Deep Dive Into Structured Streaming: https://databricks.com/session/a-deep-dive-into-structured-streaming

Continuous Applications: Evolving Streaming in Apache Spark 2.0: https://databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html

Spark Structured Streaming:A new high-level API for streaming: https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html

Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming: https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Benchmarking Structured Streaming on Databricks Runtime Against State-of-the-Art Streaming Systems: https://databricks.com/blog/2017/10/11/benchmarking-structured-streaming-on-databricks-runtime-against-state-of-the-art-streaming-systems.html

原文連結:https://zhuanlan.zhihu.com/p/51883927

多語言阿里小蜜、美團智能調度、雲上智慧機器人,眾多領域紛紛引入人工智慧,多種技術組合後打包為產品或服務,改變了不同領域的商業實踐,使垂直領域 AI 商業化進程加速,從而掀起一場智能革命。7 月 ArchSummit 全球架構師峰會,阿里、美團、騰訊等多位技術專家與你分享 AI 最新技術,掃描下圖二維碼或點擊【閱讀原文】,查看大會詳細日程:

相關焦點

  • Structured Streaming與Flink比較
    sql和structured Streaming了。狀態管理狀態維護應該是流處理非常核心的概念了,比如join,分組,聚合等操作都需要維護歷史狀態,那麼flink在這方面很好,structured Streaming也是可以,但是spark Streaming就比較弱了,只有個別狀態維護算子upstatebykye等,大部分狀態需要用戶自己維護,雖然這個對用戶來說有更大的可操作性和可以更精細控制但是帶來了編程的麻煩。
  • spark streaming流處理入門乾貨,傾力奉獻
    02spark streaming概述Spark Streaming 提供一個對於流數據的抽象 DStream。spark也同樣支持滑動窗口操作。05小結spark streaming作為spark中的流處理組件,把連續的流數據按照時間間隔劃分為一個個數據塊,然後對每個數據塊分別進行批處理
  • 大數據入門:Spark Streaming實際應用
    二、Sparkstreaming應用場景基於Spark Streaming優秀的性能表現,在很多的企業級應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等,都能夠給出合理的解決方案。
  • What does iQiyi’s IPO mean to the Chinese video streaming...
    The prisoner’s dilemma that Chinese video streaming platforms are faced withWith a mature market, the American streaming industry is pretty
  • 『 Spark 』13. Spark 2.0 Release Notes 中文版
    過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來。寫這樣一個系列僅僅是為了梳理個人學習spark的筆記記錄,所以一切以能夠理解為主,沒有必要的細節就不會記錄了,而且文中有時候會出現英文原版文檔,只要不影響理解,都不翻譯了。若想深入了解,最好閱讀參考文章和官方文檔。其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本號還是必要的。
  • iQiyi ups the ante on streaming film productions
    Following the success of Light On theater — iQiyi’s streaming platform that primarily showcases online suspense-thriller series, the company will launch theaters for romance and comedy series next
  • Spark Streaming 和 Flink 誰是數據開發者的最愛?
    val Array(brokers, topics) = args// 創建一個批處理時間是2s的context val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext
  • Spark實戰第二版(涵蓋Spark3.0)-第十章 通過結構化流接入數據
    .x.utils.streaming. app.RecordsInFilesGeneratorApp </mainClass> #2 </configuration> </execution> ...
  • 圖解SparkStreaming與Kafka的整合,細節滿滿
    我們要知道Spark作為實時計算框架,它僅僅涉及到計算,並沒有涉及到數據的存儲,所以我們後期需要使用spark對接外部的數據源。SparkStreaming作為Spark的一個子模塊,它有4個類型的數據源:socket數據源(測試的時候使用)HDFS數據源(會用到,但是用得不多)自定義數據源(不重要,沒怎麼見過別人會自定義數據源)擴展的數據源(比如kafka數據源,它非常重要,面試中也會問到)下面老劉圖解SparkStreaming與Kafka的整合,但只講原理,代碼就不貼了,網上太多了,老劉寫一些自己理解的東西
  • 【大數據嗶嗶集20210117】Spark面試題靈魂40問
    不一定,當數據規模小,Hash shuffle快於Sorted Shuffle數據規模大的時候;當數據量大,sorted Shuffle會比Hash shuffle快很多,因為數量大的有很多小文件,不均勻,甚至出現數據傾斜,消耗內存大,1.x之前spark使用hash,適合處理中小規模,1.x之後,增加了Sorted shuffle,Spark更能勝任大規模處理了。
  • 論SparkStreaming的數據可靠性和一致性
    Receiver任務啟動後,會使用Kafka的高級API來創建topicMessageStreams對象,並逐條讀取數據流緩存,每個batchInerval時刻到來時由JobGenerator提交生成一個spark計算任務。
  • 《Spark編程基礎(Python版)》
    教材官網為 http://dblab.xmu.edu.cn/post/spark-python/ 。認識林子雨老師是被他的數字教師和實驗室主頁震撼,他幾乎事無巨細地記錄了工作學習中的點點滴滴,這還是很少見的。數字教師:基礎數據+對外智能服務。
  • 【厚積薄發】Texture Streaming Mipmap使用疑問
    1、看到Unity新增了StreamingMipmap流,想問下loadedMipmapLevel是否是當前貼圖正在使用的Mipmap level,官方解釋是Which mipmap level is currently loaded by the streaming system.試了一下,對著一個物體走近走遠
  • 一、Spark概述
    2、易用性(可以通過java/scala/python/R開發spark應用程式)3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)4、兼容性(spark程序可以運行在standalone/yarn/mesos)Spark 框架模塊整個Spark 框架模塊包含
  • 如何將 MapReduce 轉化為 Spark
    developerWorks 中國MapReduce VS Spark目前的大數據處理可以分為以下三個類型:複雜的批量數據處理(batch data processing),通常的時間跨度在數十分鐘到數小時之間;基於歷史數據的交互式查詢(interactive query),通常的時間跨度在數十秒到數分鐘之間;基於實時數據流的數據處理(streaming
  • 大數據分析工程師入門9-Spark SQL
    最終Spark團隊放棄了Shark,推出了Spark SQL項目,其具備以下特性:標準的數據連接,支持多種數據源多種性能優化技術組件的可擴展性比如:使用sql-需要sqlContext,使用hive-需要hiveContext,使用streaming-需要StreamingContext。SparkSession封裝了SparkContext和SQLContext。
  • Apache Spark 1.5.0 正式發布 - OSCHINA - 中文開源技術交流社區
    mode[SPARK-4752] - Classifier based on artificial neural network[SPARK-5133] - Feature Importance for Random Forests[SPARK-5155] - Python API for MQTT streaming