本文為「美圖數據技術團隊」投稿
本文從編程模型、任務調度、時間機制、Kafka 動態分區的感知、容錯及處理語義、背壓等幾個方面對比 Spark Streaming 與 Flink,希望對有實時處理需求業務的企業端用戶在框架選型有所啟發。
前排友情提示:本文篇幅較長,建議先收藏。
編程模型對比
運行角色
Spark Streaming 運行時的角色(standalone 模式)主要有:
Master:主要負責整體集群資源的管理和應用程式調度;Worker:負責單個節點的資源管理,driver 和 executor 的啟動等;Driver:用戶入口程序執行的地方,即 SparkContext 執行的地方,主要是 DAG 生成、stage 劃分、task 生成及調度;Executor:負責執行 task,反饋執行狀態和執行結果。Flink 運行時的角色(standalone 模式)主要有:
Jobmanager:協調分布式執行,他們調度任務、協調 checkpoints、協調故障恢復等。至少有一個 JobManager。高可用情況下可以啟動多個 JobManager,其中一個選舉為 leader,其餘為 standby;Taskmanager: 負責執行具體的 tasks、緩存、交換數據流,至少有一個 TaskManager;Slot:每個 task slot 代表 TaskManager 的一個固定部分資源,Slot 的個數代表著 taskmanager 可並行執行的 task 數。生態
(圖 1:Spark Streaming 生態,via Spark 官網)
(圖 2:Flink 生態,via Flink官網)
運行模型
Spark Streaming 是微批處理,運行的時候需要指定批處理的時間,每次運行 job 時處理一個批次的數據,流程如圖 3 所示:
(圖 3:via Spark 官網)
Flink 是基於事件驅動的,事件可以理解為消息。事件驅動的應用程式是一種狀態應用程式,它會從一個或者多個流中注入事件,通過觸發計算更新狀態,或外部動作對注入的事件作出反應。
(圖 4:via Fink 官網)
編程模型對比
編程模型對比,主要是對比 Flink 和 Spark Streaming 兩者在代碼編寫上的區別。
Spark Streaming
Spark Streaming 與 Kafka 的結合主要是兩種模型:
基於 receiver dstream;基於 direct dstream。以上兩種模型編程機構近似,只是在 API 和內部數據獲取有些區別,新版本的已經取消了基於 receiver 這種模式,企業中通常採用基於 direct Dstream 的模式。
val Array(brokers, topics) = args// 創建一個批處理時間是2s的context val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // 使用broker和topic創建DirectStream val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Get the lines, split them into words, count the words and print val lines = messages.map(_.value) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // 啟動流 ssc.start() ssc.awaitTermination()
通過以上代碼我們可以 Get 到:
設置批處理時間;創建數據流;編寫 transform;編寫 action;啟動執行。Flink
接下來看 Flink 與 Kafka 結合是如何編寫代碼的。Flink 與 Kafka 結合是事件驅動,大家可能對此會有疑問,消費 Kafka 的數據調用 Poll 的時候是批量獲取數據的(可以設置批處理大小和超時時間),這就不能叫做事件觸發了。
而實際上,Flink 內部對 Poll 出來的數據進行了整理,然後逐條 emit,形成了事件觸發的機制。
下面的代碼是 Flink 整合 Kafka 作為 data source 和 data sink:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // ExecutionConfig.GlobalJobParameters env.getConfig().setGlobalJobParameters(null); DataStream<KafkaEvent> input = env .addSource( new FlinkKafkaConsumer010<>( parameterTool.getRequired("input-topic"), new KafkaEventSchema(), parameterTool.getProperties()) .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).setParallelism(1).rebalance() .keyBy("word") .map(new RollingAdditionMapper()).setParallelism(0); input.addSink( new FlinkKafkaProducer010<>( parameterTool.getRequired("output-topic"), new KafkaEventSchema(), parameterTool.getProperties())); env.execute("Kafka 0.10 Example");
從 Flink 與 Kafka 結合的代碼可以 Get 到:
註冊數據 source;編寫運行邏輯;註冊數據 sink。調用 env.execute 相比於 Spark Streaming 少了設置批處理時間,還有一個顯著的區別是 flink 的所有算子都是 lazy 形式的,調用 env.execute 會構建 jobgraph。client 端負責 Jobgraph 生成並提交它到集群運行;而 Spark Streaming的操作算子分 action 和 transform,其中僅有 transform 是 lazy 形式,而且 DAG 生成、stage 劃分、任務調度是在 driver 端進行的,在 client 模式下 driver 運行於客戶端處。
任務調度原理
Spark 任務調度
Spark Streaming 任務如上文提到的是基於微批處理的,實際上每個批次都是一個 Spark Core 的任務。對於編碼完成的 Spark Core 任務在生成到最終執行結束主要包括以下幾個部分:
構建 DAG 圖;劃分 stage;生成 taskset;調度 task。具體可參考圖 5:
(圖 5:Spark 任務調度)
對於 job 的調度執行有 fifo 和 fair 兩種模式,Task 是根據數據本地性調度執行的。
假設每個 Spark Streaming 任務消費的 Kafka topic 有四個分區,中間有一個 transform操作(如 map)和一個 reduce 操作,如圖 6 所示:
(圖 6 )
假設有兩個 executor,其中每個 executor 三個核,那麼每個批次相應的 task 運行位置是固定的嗎?是否能預測?
由於數據本地性和調度不確定性,每個批次對應 Kafka 分區生成的 task 運行位置並不是固定的。
Flink 任務調度
對於 Flink 的流任務客戶端首先會生成 StreamGraph,接著生成 JobGraph,然後將 jobGraph 提交給 Jobmanager 由它完成 jobGraph 到 ExecutionGraph 的轉變,最後由 jobManager 調度執行。
(圖 7)
如圖 7 所示有一個由 data source、MapFunction和 ReduceFunction 組成的程序,data source 和 MapFunction 的並發度都為 4,而 ReduceFunction 的並發度為 3。
一個數據流由 Source-Map-Reduce 的順序組成,在具有 2 個TaskManager、每個 TaskManager 都有 3 個 Task Slot 的集群上運行。
可以看出 Flink 的拓撲生成提交執行之後,除非故障,否則拓撲部件執行位置不變,並行度由每一個算子並行度決定,類似於 Storm。
而 Spark Streaming 是每個批次都會根據數據本地性和資源情況進行調度,無固定的執行拓撲結構。
Flink 是數據在拓撲結構裡流動執行,而 Spark Streaming 則是對數據緩存批次並行處理。
時間機制對比
流處理的時間
流處理程序在時間概念上總共有三個時間概念:
處理時間處理時間是指每臺機器的系統時間,當流程序採用處理時間時將使用運行各個運算符實例的機器時間。處理時間是最簡單的時間概念,不需要流和機器之間的協調,它能提供最好的性能和最低延遲。
然而在分布式和異步環境中,處理時間不能提供消息事件的時序性保證,因為它受到消息傳輸延遲,消息在算子之間流動的速度等方面制約。
事件時間事件時間是指事件在其設備上發生的時間,這個時間在事件進入 Flink 之前已經嵌入事件,然後 Flink 可以提取該時間。基於事件時間進行處理的流程序可以保證事件在處理的時候的順序性,但是基於事件時間的應用程式必須要結合 watermark 機制。
基於事件時間的處理往往有一定的滯後性,因為它需要等待後續事件和處理無序事件,對於時間敏感的應用使用的時候要慎重考慮。
注入時間注入時間是事件注入到 Flink 的時間。事件在 source 算子處獲取 source 的當前時間作為事件注入時間,後續的基於時間的處理算子會使用該時間處理數據。
相比於事件時間,注入時間不能夠處理無序事件或者滯後事件,但是應用程式無序指定如何生成 watermark。
在內部注入時間程序的處理和事件時間類似,但是時間戳分配和 watermark 生成都是自動的。
圖 8 可以清晰地看出三種時間的區別:
(圖 8)
Spark 時間機制
Spark Streaming 只支持處理時間,Structured streaming 支持處理時間和事件時間,同時支持 watermark 機制處理滯後數據。
Flink 時間機制
Flink 支持三種時間機制:事件時間、注入時間、處理時間,同時支持 watermark 機制處理滯後數據。
Kafka 動態分區檢測
Spark Streaming
對於有實時處理業務需求的企業,隨著業務增長數據量也會同步增長,將導致原有的 Kafka 分區數不滿足數據寫入所需的並發度,需要擴展 Kafka 的分區或者增加 Kafka 的 topic,這時就要求實時處理程序,如 SparkStreaming、Flink 能檢測到 Kafka 新增的 topic 、分區及消費新增分區的數據。
接下來結合源碼分析,Spark Streaming 和 Flink 在 Kafka 新增 topic 或 partition 時能否動態發現新增分區並消費處理新增分區的數據。
Spark Streaming 與 Kafka 結合有兩個區別比較大的版本,如圖 9 所示是官網給出的對比數據:
(圖 9)
其中確認的是 Spark Streaming 與 Kafka 0.8 版本結合不支持動態分區檢測,與 0.10 版本結合支持,接著通過源碼分析。
Spark Streaming 與 kafka 0.8 版本結合(*源碼分析只針對分區檢測)
入口是 DirectKafkaInputDStream 的 compute:
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {// 改行代碼會計算這個job,要消費的每個kafka分區的最大偏移val untilOffsets = clamp(latestLeaderOffsets(maxRetries))// 構建KafkaRDD,用指定的分區數和要消費的offset範圍val rdd = KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker.val offsetRanges = currentOffsets.map { case (tp, fo) =>val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset) } val description = offsetRanges.filter { offsetRange =>// Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the userval metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) }
第一行就是計算得到該批次生成 KafkaRDD 每個分區要消費的最大 offset。 接著看 latestLeaderOffsets(maxRetries)。
@tailrecprotectedfinal def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {// 可以看到的是用來指定獲取最大偏移分區的列表還是只有currentOffsets,沒有發現關於新增的分區的內容。val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manuallyif (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err)} else { logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs) latestLeaderOffsets(retries - 1) } } else { o.right.get } }
其中 protected var currentOffsets = fromOffsets,這個僅僅是在構建 DirectKafkaInputDStream 的時候初始化,並在 compute 裡面更新:
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
中間沒有檢測 Kafka 新增 topic 或者分區的代碼,所以可以確認 Spark Streaming 與 kafka 0.8 的版本結合不支持動態分區檢測。
Spark Streaming 與 Kafka 0.10 版本結合
入口同樣是 DirectKafkaInputDStream 的 compute 方法,撿主要的部分說,Compute 裡第一行也是計算當前 job 生成 kafkardd 要消費的每個分區的最大 offset:
// 獲取當前生成job,要用到的KafkaRDD每個分區最大消費偏移值val untilOffsets = clamp(latestOffsets())
具體檢測 Kafka 新增 topic 或者分區的代碼在 latestOffsets()
/** * Returns the latest (highest) available offsets, taking new partitions into account. */protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer paranoidPoll(c) // 獲取所有的分區信息 val parts = c.assignment().asScala // make sure new partitions are reflected in currentOffsets// 做差獲取新增的分區信息 val newPartitions = parts.diff(currentOffsets.keySet) // position for new partitions determined by auto.offset.reset if no commit// 新分區消費位置,沒有記錄的化是由auto.offset.reset決定 currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // don't want to consume messages, so pause c.pause(newPartitions.asJava) // find latest available offsets c.seekToEnd(currentOffsets.keySet.asJava) parts.map(tp => tp -> c.position(tp)).toMap }
該方法內有獲取 Kafka 新增分區,並將其更新到 currentOffsets 的過程,所以可以驗證 Spark Streaming 與 Kafka 0.10 版本結合支持動態分區檢測。
Flink
入口類是 FlinkKafkaConsumerBase,該類是所有 Flink 的 Kafka 消費者的父類。
(圖 10)
在 FlinkKafkaConsumerBase 的 run 方法中,創建了 kafkaFetcher,實際上就是消費者:
this.kafkaFetcher = createFetcher(sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics);
接是創建了一個線程,該線程會定期檢測 Kafka 新增分區,然後將其添加到 kafkaFetcher 裡。
if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); this.discoveryLoopThread = new Thread(new Runnable() { @Overridepublicvoidrun(){ try { // --------------------- partition discovery loop ---------------------List<KafkaTopicPartition> discoveredPartitions; // throughout the loop, we always eagerly check if we are still running before// performing the next operation, so that we can escape the loop as soon as possiblewhile (running) { if (LOG.isDebugEnabled()) { LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask()); } try { discoveredPartitions = partitionDiscoverer.discoverPartitions(); } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { // the partition discoverer may have been closed or woken up before or during the discovery;// this would only happen if the consumer was canceled; simply escape the loopbreak; } // no need to add the discovered partitions if we were closed during the meantimeif (running && !discoveredPartitions.isEmpty()) { kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); } // do not waste any time sleeping if we're not running anymoreif (running && discoveryIntervalMillis != 0) { try { Thread.sleep(discoveryIntervalMillis); } catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loopbreak; } } } } catch (Exception e) { discoveryLoopErrorRef.set(e); } finally { // calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) { cancel(); } } } }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks()); discoveryLoopThread.start(); kafkaFetcher.runFetchLoop();
上面,就是 Flink 動態發現 Kafka 新增分區的過程。不過與 Spark 無需做任何配置不同的是,Flink 動態發現 Kafka 新增分區,這個功能需要被使能的。
也很簡單,需要將 flink.partition-discovery.interval-millis 該屬性設置為大於 0 即可。
容錯機制及處理語義
本節內容主要是想對比兩者在故障恢復及如何保證僅一次的處理語義。這個時候適合拋出一個問題:實時處理的時候,如何保證數據僅一次處理語義?
Spark Streaming 保證僅一次處理
對於 Spark Streaming 任務,我們可以設置 checkpoint,然後假如發生故障並重啟,我們可以從上次 checkpoint 之處恢復,但是這個行為只能使得數據不丟失,可能會重複處理,不能做到恰一次處理語義。
對於 Spark Streaming 與 Kafka 結合的 direct Stream 可以自己維護 offset 到 Zookeeper、Kafka 或任何其它外部系統,每次提交完結果之後再提交 offset,這樣故障恢復重啟可以利用上次提交的 offset 恢復,保證數據不丟失。
但是假如故障發生在提交結果之後、提交 offset 之前會導致數據多次處理,這個時候我們需要保證處理結果多次輸出不影響正常的業務。
由此可以分析,假設要保證數據恰一次處理語義,那麼結果輸出和 offset 提交必須在一個事務內完成。在這裡有以下兩種做法:
repartition(1) Spark Streaming 輸出的 action 變成僅一個 partition,這樣可以利用事務去做:Dstream.foreachRDD(rdd=>{ rdd.repartition(1).foreachPartition(partition=>{ // 開啟事務 partition.foreach(each=>{// 提交數據 }) // 提交事務 }) })
將結果和 offset 一起提交。也就是結果數據包含 offset。這樣提交結果和提交 offset 就是一個操作完成,不會數據丟失,也不會重複處理。故障恢復的時候可以利用上次提交結果帶的 offset。
Flink 與 kafka 0.11 保證僅一次處理
若要 sink 支持僅一次語義,必須以事務的方式寫數據到 Kafka,這樣當提交事務時兩次 checkpoint 間的所有寫入操作作為一個事務被提交。這確保了出現故障或崩潰時這些寫入操作能夠被回滾。
在一個分布式且含有多個並發執行 sink 的應用中,僅僅執行單次提交或回滾是不夠的,因為所有組件都必須對這些提交或回滾達成共識,這樣才能保證得到一致性的結果。
Flink 使用兩階段提交協議以及預提交(pre-commit)階段來解決這個問題。
本例中的 Flink 應用如圖 11 所示包含以下組件:
一個source,從Kafka中讀取數據(即KafkaConsumer);一個時間窗口化的聚會操作;一個sink,將結果寫回到Kafka(即KafkaProducer)。
(圖 11)
下面詳細講解 Flink 的兩段提交思路:
(圖 12)
如圖 12 所示,Flink checkpointing 開始時便進入到 pre-commit 階段。
具體來說,一旦 checkpoint 開始,Flink 的 JobManager 向輸入流中寫入一個 checkpoint barrier ,將流中所有消息分割成屬於本次 checkpoint 的消息以及屬於下次 checkpoint 的,barrier 也會在操作算子間流轉。
對於每個 operator 來說,該 barrier 會觸發 operator 狀態後端為該 operator 狀態打快照。
data source 保存了 Kafka 的 offset,之後把 checkpoint barrier 傳遞到後續的 operator。
這種方式僅適用於 operator 僅有它的內部狀態。內部狀態是指 Flink state backends 保存和管理的內容(如第二個 operator 中 window 聚合算出來的 sum)。
當一個進程僅有它的內部狀態的時候,除了在 checkpoint 之前將需要將數據更改寫入到 state backend,不需要在預提交階段做其他的動作。
在 checkpoint 成功的時候,Flink 會正確的提交這些寫入,在 checkpoint 失敗的時候會終止提交,過程可見圖 13。
(圖 13)
當結合外部系統的時候,外部系統必須要支持可與兩階段提交協議捆綁使用的事務。
顯然本例中的 sink 由於引入了 kafka sink,因此在預提交階段 data sink 必須預提交外部事務。如下圖:
(圖 14)
當 barrier 在所有的算子中傳遞一遍,並且觸發的快照寫入完成,預提交階段完成。
所有的觸發狀態快照都被視為 checkpoint 的一部分,也可以說 checkpoint 是整個應用程式的狀態快照,包括預提交外部狀態。出現故障可以從 checkpoint 恢復。
下一步就是通知所有的操作算子 checkpoint 成功。該階段 jobmanager 會為每個 operator 發起 checkpoint 已完成的回調邏輯。
本例中 data source 和窗口操作無外部狀態,因此該階段,這兩個算子無需執行任何邏輯,但是 data sink 是有外部狀態的,因此,此時我們必須提交外部事務,如下圖:
(圖 15)
以上就是 Flink 實現恰一次處理的基本邏輯。
Back pressure
消費者消費的速度低於生產者生產的速度,為了使應用正常,消費者會反饋給生產者來調節生產者生產的速度,以使得消費者需要多少,生產者生產多少。(*back pressure 後面一律稱為背壓。)
Spark Streaming 的背壓
Spark Streaming 跟 Kafka 結合是存在背壓機制的,目標是根據當前 job 的處理情況來調節後續批次的獲取 Kafka 消息的條數。
為了達到這個目的,Spark Streaming 在原有的架構上加入了一個 RateController,利用的算法是 PID,需要的反饋數據是任務處理的結束時間、調度時間、處理時間、消息條數。
這些數據是通過 SparkListener 體系獲得,然後通過 P 的 compute 計算得到一個速率,進而可以計算得到一個 offset,然後跟限速設置最大消費條數比較得到一個最終要消費的消息最大 offset。
P 的 compute 方法如下:
def compute( time: Long, // in millisecondsnumElements: Long, processingDelay: Long, // in milliseconds schedulingDelay: Long// in milliseconds ): Option[Double] = { logTrace(s"\ntime = $time, # records = $numElements, " + s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000val processingRate = numElements.toDouble / processingDelay * 1000val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2)val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate) logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin) latestTime = time if (firstRun) { latestRate = processingRate latestError = 0D firstRun = false logTrace("First run, rate estimation skipped") None } else { latestRate = newRate latestError = error logTrace(s"New rate = $newRate") Some(newRate) } } else { logTrace("Rate estimation skipped") None } } }
Flink 的背壓
與 Spark Streaming 的背壓不同的是,Flink 背壓是 jobmanager 針對每一個 task 每 50ms 觸發 100 次 Thread.getStackTrace() 調用,求出阻塞的佔比。過程如圖 16 所示:
(圖 16)
阻塞佔比在 web 上劃分了三個等級:
OK: 0 <= Ratio <= 0.10,表示狀態良好;LOW: 0.10 < Ratio <= 0.5,表示有待觀察;HIGH: 0.5 < Ratio <= 1,表示要處理了。本文為作者投稿,版權歸對方所有。