Flink CookBook|水位生成機制

2020-12-13 dataTECH

一、背景知識

在時間窗口裡,Flink根據元素的時間屬性,將元素歸類到特定時間窗口,應用程式可以從一個時間戳推斷出某個事件屬於哪個時間窗口,但它如何能知道它已經收到了某個時間窗口內的所有事件並且可以關閉當前窗口了呢?

為了計算基於時間窗口的聚合,我們需要能夠將事件映射到窗口,並且應用程式需要知道它何時(即便只是估計)能夠關閉一個給定的窗口並報告計算結果。window區間內可以認為有兩個基本的概念window_start_time和window_end_time,顧名思義,window_start_time代表的是區間的起始時間,而window_end_time代表的是window時間區間的結束時間,根據元素時間戳就可以將元素放到合適的窗口。時間戳可以解決元素歸類到窗口的問題,水位解決第二個問題。水位本質上是個時間戳,在flink中,水位是單調遞增的,表示數據處理的進度,當watermark >=window_end_time,即水位線大於window_end_time 時才會真正觸發窗口計算。實際上會有多種觸發機制,窗口相關的內容會在後續文章裡細聊。

更詳細的可參閱Flink CookBook—流式計算介紹

二、延遲計算和數據完整性

水印是數據延遲和結果正確性之間的一種平衡,在計算之前,需要等多長時間來確定數據完全到達。在用基於事件時間的算子裡,水印決定攝取到的數據完整性和計算的進度。但現實場景裡,沒有完美的水印,因為那意味著始終沒有數據延遲。生成不準確的水印,可能導致數據不完整或增加不必要的應用延遲。

如果生成的水印較已處理元素的時間戳更早,窗口可以接受更多落後的元素,但可能增加計算結果的延遲,而且通常狀態的大小也會更大,因為應用需要緩存更多的數據;但是如果水印生成的頻率比較大(水印可能大於某些延遲消息的時間戳),則產生的結果可能不準確,但延遲會比較低。

三、水位生成器分類

在Flink中使用事件時間時,必須對流中事件進行時間戳賦值。因為在基於事件時間上的所有操作都會使用事件時間,所以儘早為消息分配時間戳是個不錯的實踐,建議在SourceFunction之後就配置時間戳和水印生成策略。當然,如果操作不會引起數據流的重分配,也可以在操作之後設置水位生成器,比如在filter算子之後配置策略。Flink會根據設置的水印生成器策略為數據流中的元素分配時間戳,並生成水印,以表示時間進度。

有多種方法配置時間戳、水位生成策略,比較常用的有兩種方法配置:

調用DataStream的assignTimestampsAndWatermarks方法,內置了水位生成機制調用DataStream的assignTimestamps方法,時間屬性抽取、水位生成、當前水位獲取都要自定義實現;新版本不的API不建議使用。水位生成器分類如下:

3.1 AssignerWithPeriodicWatermarks

根據指定的時間間隔周期性的發射水位。Flink實現了兩種周期性水位生成器:

AscendingTimestampExtractor升序模式,會將數據中的時間戳根據指定欄位抽取,並用當前時間戳作為最新水印,這種比較適用於數據流裡事件是按順序生成,沒有亂序情況。

子類實現extractAscendingTimestamp方法,從元素裡抽取時間戳(currentTimestamp),然後根據這個時間戳創建水印(new Watermark(currentTimestamp)),如果下個元素時間戳比currentTimestamp小,創建的水印的時間戳保持不變,這樣可以保證水印是單調遞增的,而且該元素會拋棄掉,因為時間比水印早,也即是如果一個元素的時間戳比上個元素時間戳早,就會拋棄掉。

BoundedOutOfOrdernessTimestampExtractor通過固定的時間間隔來指定水印落後於時間戳的區間長度,也就是最長容忍遲多長的時間內到達,適用於了解消息最大延遲時間的情況下。

類的構造函數傳入一個時間,指定最大的落後時間;子類實現extractTimestamp方法,從元素裡抽取時間戳。同樣也是先從元素裡抽取到時間戳(currentTimestamp),但是創建水印的方式是new Watermark(currentTimestamp-maxOutOfOrderness),如果下個元素的時間戳在水印之後,即currentTimestamp-maxOutOfOrderness之後,就不會丟棄,所以允許消息最大落後maxOutOfOrderness個時間單位。

示意圖見下圖,紅色元素時間戳在水位之前,會丟棄;綠色元素時間戳在水位範圍內,是有效元素;藍色是周期性觸發生成水印時,生成的水印:

3.2 AssignerWithPunctuatedWatermarks

算子在每次處理元素時就發射水印,即是在每次抽取元素的時間戳後,根據元素和元素的時間戳馬上生成水位。可完全控制水位的生成,設置的水位比較精準,但當數據量大時,頻繁的emit水位會降低降低性能。適用於基於輸入元素的其他屬性生成水位、元素裡得屬性可以明確得指定流處理進度的情況。子類實現checkAndGetNextWatermark方法,該方法接收兩個參數,第一個參數是元素值,第二個參數是元素的時間戳,方法最終返回一個水位。

3.2 TimestampExtractor

上面兩種方式的混合體:算子在每次處理元素時就發射水位並同時支持周期性發射水位。可完全控制水位生成。

四、水位emit實現過程

水印生成的實現類似於一個transformation算子:算子會作用到流裡的每個元素,生成一個新的帶有時間戳元素的數據流,並發射水印。水位生成器不會改變數據流的元素類型。

首先算子每接收到一個元素時,會在processElement方法裡對元素進行處理,生成一個新的數據流,為數據流裡的元素增加時間戳屬性:

// 1、根據設置的TimestampAssigner獲取元素時間戳final long newTimestamp = userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);// 2、根據時間戳重新生成時間,即為事件增加時間屬性StreamRecord recordWithTT = new StreamRecord(element.getValue(), newTimestamp)// 3、輸出新的數據流output.collect(recordWithTT);

有三種水位生成實現算子,對應三種水位生成器方式,下面分別介紹。

4.1 TimestampsAndPeriodicWatermarksOperator

為數據流的元素分配時間戳,並定期生成水位。水位生成器時間間隔可以ExecutionConfig.setAutoWatermarkInterval方法設置,默認200毫秒。程序裡根據ScheduledThreadPoolExecutor延遲某個時間後,調度TriggerTask,TriggerTask的run方法裡回調onProcessingTime方法,該方法裡實現水印發送:

水印的定時生成,不是通過調度任務周期性調用的,而是每次註冊一個新的線程(scheduledExecutor.schedule),然後延遲執行,線程執行時會回調onProcessingTime方法,實現水印循環生成。這這個地方如果該用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法,周期性的執行線程會更好,不用每次都重新創建線程、將線程放到線程調度池裡。

這種水印生成方法是周期性調用的,所以即使數據流裡沒有元素,也會定時調用,獲取當前水印,只是不會最終emit水印(沒有元素的情況下,水印不會向前移動)。

4.2 TimestampsAndPunctuatedWatermarksOperator

水印的生成和元素的處理是綁定在一起,元素處理好就理解emit水位,即直接在算子的processElement方法裡調用checkAndGetNextWatermark獲取該元素的水印,這個函數是要用戶自定義實現的,可以完全控制水印生成。如果返回的水印大於最近emit的水印,則發射新的水印。

// 1、調用水印生成器(TimestampAssigner)方法,抽取數據流中元素的時間戳final long newTimestamp = userFunction.extractTimestamp(element.value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);// 2、調用水印生成器(TimestampAssigner)方法,獲取該元素的水印final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(element.value, newTimestamp);// 3、將返回的水印和最近發射的水印進行比較if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {// 如果水印是遞增的,就發射新的水印 currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark);}

因為是對每個元素都會生成水印,所以發射水印可能會很頻繁,適用於元素裡可以明確標識水印的場景。

4.3 ExtractTimestampsOperator

上面兩種發射水位的結合形成的一種方案,即支持周期性水位發射,又支持在每次處理元素時就發射水位。

怎麼給元素分配到合適的窗口?窗口怎麼一個觸發機制?這些問題留待後續探討。

相關焦點

  • Flink CookBook|Process Function解析
    KeyedProcessFunctionKeyedProcessFunction作用於KeyedStream上,生成 一個新的DataStream,該轉換函數會針對流中的每條記錄調用一次,並emit零個、一個或多個記錄。
  • Structured Streaming與Flink比較
    Structured Streaming 周期性或者連續不斷的生成微小dataset,然後交由Spark SQL的增量引擎執行,跟Spark Sql的原有引擎相比,增加了增量處理的功能,增量就是為了狀態和流表功能實現。
  • Flink 端到端 Exactly-once 機制剖析
    但是,flink有很多外接系統,比如將數據寫到kafka,一旦作業失敗重啟,offset重置,會消費舊數據,從而將重複的結果寫到kafka。如下圖:      這個時候,僅靠系統本身是無法保證exactly-once的。系統之間的數據一致性一般要靠2PC協議來保證,flink的TwoPhaseCommitSinkFunction也是基於此實現的。
  • Flink 是如何將你寫的代碼生成 StreamGraph 的 (上篇)
    一般我們執行一個 Flink 程序,都是使用命令行 flink run(flink 界面上執行的時候,也是在調用 flink run 命令來執行的)來執行,然後shell 會使用 java 命令,執行到 CliFrontend 類的 main 方法。
  • 一篇文章讓深入理解Flink SQL 時間特性
    一、處理時間(Processing Time)         處理時間語義下,允許表處理程序根據機器的本地時間生成結果。它是時間的最簡單概念。它既不需要提取時間戳,也不需要生成 watermark。_import org.apache.flink.table.api.Tableimport org.apache.flink.table.api.scala.
  • Flink寫入hive測試
    >      <groupId>org.apache.flink</groupId>      <artifactId>flink-core</artifactId>      <version>${flink.version}</version>      <!
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    22、OPPO數據中臺之基石:基於Flink SQL構建實數據倉庫23、流計算框架 Flink 與 Storm 的性能對比24、Flink狀態管理和容錯機制介紹25、原理解析 | Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理26、Apache Flink 是如何管理好內存的?
  • 萬字詳解Flink極其重要的Time與Window(建議收藏)
    它通常由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。Ingestion Time:是數據進入Flink的時間。 1import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 2import org.apache.flink.api.scala.
  • 深入解析 Flink 的算子鏈機制
    這種情況幾乎都不是程序有問題,而是因為 Flink 的 operator chain ——即算子鏈機制導致的,即提交的作業的執行計劃中,所有算子的並發實例(即 sub-task )都因為滿足特定條件而串成了整體來執行,自然就觀察不到算子之間的數據流量了。當然上述是一種特殊情況。
  • flink-1.12.0 upsert-kafka connector demo
    參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh
  • 開篇|揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?
    查詢處理器是 Planner 的具體實現, 通過parser(解析器)、optimizer(優化器)、codegen(代碼生成技術)等流程將 Table API & SQL作業轉換成 Flink Runtime 可識別的 Transformation DAG (由Transformation組成的有向無環圖,表示作業的轉換邏輯),最終由 Flink Runtime 進行作業的調度和執行。
  • Flink SQL 實戰:HBase 的結合應用
    可以實時寫 HBase,也可以利用 buckload 一把把離線 Job 生成 HFile Load 到HBase 表中。而當下 Flink SQL 的火熱程度不用多說,Flink SQL 也為 HBase 提供了 connector,因此 HBase 與 Flink SQL 的結合非常有必要實踐實踐。
  • Flink保證端到端exactly-once語義(也適用於kafka)
    接下來,我們進一步介紹flink的這個特性:    •flink的checkpoints在保證exactly-once語義是的作用    •flink是如何通過兩部提交協議來保證從數據源到數據輸出的exactly-once語義    •通過一個例子來解釋如果應用TwoPhaseCommitSinkFunction來實現一個exactly-once的sinkexactly-once
  • Apache Flink 誤用之痛
    郵件列表:user@flink.apache.com/user-zh@flink.apache.orgStack Overflow:www.stackoverflow.com2.第一個,Flink 作業一般都有狀態讀取,做升級時需要有 savepoint 機制來保障,將狀態存儲保留在遠端,再恢復到新的作業上去。
  • 數據說話:大數據處理引擎Spark與Flink比拼
    每個算子(如 map,filter,join)生成一個新的 RDD。所有的算子組成一個有向無環圖(DAG)。Spark 比較簡單地把邊分為寬依賴和窄依賴。上下遊數據不需要 shuffle 的即為窄依賴,可以把上下遊的算子放在一個階段(stage) 裡在本地連續處理,這時上遊的結果 RDD 可以 省略。下圖展示了相關的基本概念。更詳細的介紹在網上比較容易找到,這裡就不花太多篇幅了。
  • Flink最難知識點再解析 | 時間/窗口/水印/遲到數據處理
    數據在源源不斷的進入flink,我們設置好window的大小為5s,flink會以5s來將每分鐘劃分為連續的多個窗口。org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.scala.
  • Apache Flink 1.5.5 和 1.6.2 發布,通用數據處理平臺
    </groupId>  <artifactId>flink-java</artifactId>  <version>1.5.5</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId
  • Flink CookBook—流式計算介紹
    數據完整性:對於延遲到達的數據,由於到達時對應的時間窗口已經關閉,這些數據無法得到計算(可能被丟棄),這會影響最終結果的正確性計算延遲:對於過早到達的數據,需要等待足夠長的時間才能得到計算,對於這些數據而言,計算延遲過高,無法儘快看到結果數據源空閒:如果一直沒有數據到達,按照當前的水印生成方法
  • Python3 CookBook中文版
    項目主頁https://github.com/yidao620c/python3-cookbook
  • Spark Streaming 和 Flink 誰是數據開發者的最愛?
    而實際上,Flink 內部對 Poll 出來的數據進行了整理,然後逐條 emit,形成了事件觸發的機制。調用 env.execute 相比於 Spark Streaming 少了設置批處理時間,還有一個顯著的區別是 flink 的所有算子都是 lazy 形式的,調用 env.execute 會構建 jobgraph。