一、背景知識
在時間窗口裡,Flink根據元素的時間屬性,將元素歸類到特定時間窗口,應用程式可以從一個時間戳推斷出某個事件屬於哪個時間窗口,但它如何能知道它已經收到了某個時間窗口內的所有事件並且可以關閉當前窗口了呢?
為了計算基於時間窗口的聚合,我們需要能夠將事件映射到窗口,並且應用程式需要知道它何時(即便只是估計)能夠關閉一個給定的窗口並報告計算結果。window區間內可以認為有兩個基本的概念window_start_time和window_end_time,顧名思義,window_start_time代表的是區間的起始時間,而window_end_time代表的是window時間區間的結束時間,根據元素時間戳就可以將元素放到合適的窗口。時間戳可以解決元素歸類到窗口的問題,水位解決第二個問題。水位本質上是個時間戳,在flink中,水位是單調遞增的,表示數據處理的進度,當watermark >=window_end_time,即水位線大於window_end_time 時才會真正觸發窗口計算。實際上會有多種觸發機制,窗口相關的內容會在後續文章裡細聊。
更詳細的可參閱Flink CookBook—流式計算介紹
二、延遲計算和數據完整性
水印是數據延遲和結果正確性之間的一種平衡,在計算之前,需要等多長時間來確定數據完全到達。在用基於事件時間的算子裡,水印決定攝取到的數據完整性和計算的進度。但現實場景裡,沒有完美的水印,因為那意味著始終沒有數據延遲。生成不準確的水印,可能導致數據不完整或增加不必要的應用延遲。
如果生成的水印較已處理元素的時間戳更早,窗口可以接受更多落後的元素,但可能增加計算結果的延遲,而且通常狀態的大小也會更大,因為應用需要緩存更多的數據;但是如果水印生成的頻率比較大(水印可能大於某些延遲消息的時間戳),則產生的結果可能不準確,但延遲會比較低。
三、水位生成器分類
在Flink中使用事件時間時,必須對流中事件進行時間戳賦值。因為在基於事件時間上的所有操作都會使用事件時間,所以儘早為消息分配時間戳是個不錯的實踐,建議在SourceFunction之後就配置時間戳和水印生成策略。當然,如果操作不會引起數據流的重分配,也可以在操作之後設置水位生成器,比如在filter算子之後配置策略。Flink會根據設置的水印生成器策略為數據流中的元素分配時間戳,並生成水印,以表示時間進度。
有多種方法配置時間戳、水位生成策略,比較常用的有兩種方法配置:
調用DataStream的assignTimestampsAndWatermarks方法,內置了水位生成機制調用DataStream的assignTimestamps方法,時間屬性抽取、水位生成、當前水位獲取都要自定義實現;新版本不的API不建議使用。水位生成器分類如下:
3.1 AssignerWithPeriodicWatermarks
根據指定的時間間隔周期性的發射水位。Flink實現了兩種周期性水位生成器:
AscendingTimestampExtractor升序模式,會將數據中的時間戳根據指定欄位抽取,並用當前時間戳作為最新水印,這種比較適用於數據流裡事件是按順序生成,沒有亂序情況。
子類實現extractAscendingTimestamp方法,從元素裡抽取時間戳(currentTimestamp),然後根據這個時間戳創建水印(new Watermark(currentTimestamp)),如果下個元素時間戳比currentTimestamp小,創建的水印的時間戳保持不變,這樣可以保證水印是單調遞增的,而且該元素會拋棄掉,因為時間比水印早,也即是如果一個元素的時間戳比上個元素時間戳早,就會拋棄掉。
BoundedOutOfOrdernessTimestampExtractor通過固定的時間間隔來指定水印落後於時間戳的區間長度,也就是最長容忍遲多長的時間內到達,適用於了解消息最大延遲時間的情況下。
類的構造函數傳入一個時間,指定最大的落後時間;子類實現extractTimestamp方法,從元素裡抽取時間戳。同樣也是先從元素裡抽取到時間戳(currentTimestamp),但是創建水印的方式是new Watermark(currentTimestamp-maxOutOfOrderness),如果下個元素的時間戳在水印之後,即currentTimestamp-maxOutOfOrderness之後,就不會丟棄,所以允許消息最大落後maxOutOfOrderness個時間單位。
示意圖見下圖,紅色元素時間戳在水位之前,會丟棄;綠色元素時間戳在水位範圍內,是有效元素;藍色是周期性觸發生成水印時,生成的水印:
3.2 AssignerWithPunctuatedWatermarks
算子在每次處理元素時就發射水印,即是在每次抽取元素的時間戳後,根據元素和元素的時間戳馬上生成水位。可完全控制水位的生成,設置的水位比較精準,但當數據量大時,頻繁的emit水位會降低降低性能。適用於基於輸入元素的其他屬性生成水位、元素裡得屬性可以明確得指定流處理進度的情況。子類實現checkAndGetNextWatermark方法,該方法接收兩個參數,第一個參數是元素值,第二個參數是元素的時間戳,方法最終返回一個水位。
3.2 TimestampExtractor
上面兩種方式的混合體:算子在每次處理元素時就發射水位並同時支持周期性發射水位。可完全控制水位生成。
四、水位emit實現過程
水印生成的實現類似於一個transformation算子:算子會作用到流裡的每個元素,生成一個新的帶有時間戳元素的數據流,並發射水印。水位生成器不會改變數據流的元素類型。
首先算子每接收到一個元素時,會在processElement方法裡對元素進行處理,生成一個新的數據流,為數據流裡的元素增加時間戳屬性:
// 1、根據設置的TimestampAssigner獲取元素時間戳final long newTimestamp = userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);// 2、根據時間戳重新生成時間,即為事件增加時間屬性StreamRecord recordWithTT = new StreamRecord(element.getValue(), newTimestamp)// 3、輸出新的數據流output.collect(recordWithTT);
有三種水位生成實現算子,對應三種水位生成器方式,下面分別介紹。
4.1 TimestampsAndPeriodicWatermarksOperator
為數據流的元素分配時間戳,並定期生成水位。水位生成器時間間隔可以ExecutionConfig.setAutoWatermarkInterval方法設置,默認200毫秒。程序裡根據ScheduledThreadPoolExecutor延遲某個時間後,調度TriggerTask,TriggerTask的run方法裡回調onProcessingTime方法,該方法裡實現水印發送:
水印的定時生成,不是通過調度任務周期性調用的,而是每次註冊一個新的線程(scheduledExecutor.schedule),然後延遲執行,線程執行時會回調onProcessingTime方法,實現水印循環生成。這這個地方如果該用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法,周期性的執行線程會更好,不用每次都重新創建線程、將線程放到線程調度池裡。
這種水印生成方法是周期性調用的,所以即使數據流裡沒有元素,也會定時調用,獲取當前水印,只是不會最終emit水印(沒有元素的情況下,水印不會向前移動)。
4.2 TimestampsAndPunctuatedWatermarksOperator
水印的生成和元素的處理是綁定在一起,元素處理好就理解emit水位,即直接在算子的processElement方法裡調用checkAndGetNextWatermark獲取該元素的水印,這個函數是要用戶自定義實現的,可以完全控制水印生成。如果返回的水印大於最近emit的水印,則發射新的水印。
// 1、調用水印生成器(TimestampAssigner)方法,抽取數據流中元素的時間戳final long newTimestamp = userFunction.extractTimestamp(element.value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);// 2、調用水印生成器(TimestampAssigner)方法,獲取該元素的水印final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(element.value, newTimestamp);// 3、將返回的水印和最近發射的水印進行比較if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {// 如果水印是遞增的,就發射新的水印 currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark);}
因為是對每個元素都會生成水印,所以發射水印可能會很頻繁,適用於元素裡可以明確標識水印的場景。
4.3 ExtractTimestampsOperator
上面兩種發射水位的結合形成的一種方案,即支持周期性水位發射,又支持在每次處理元素時就發射水位。
怎麼給元素分配到合適的窗口?窗口怎麼一個觸發機制?這些問題留待後續探討。