Apache Flink 流處理核心組件 Time&Window 深度解析

2021-12-25 Apache Flink

本文根據 Apache Flink 系列直播課程整理而成,由阿里巴巴高級開發工程師邱從賢(山智)分享,文章將從 Window & Time 介紹、Window API 使用、Window 內部實現三部分內容分享。

Apache Flink (以下簡稱 Flink)是一個天然支持無限流數據處理的分布式計算框架,在 Flink 中 Window 可以將無限流切分成有限流,是處理有限流的核心組件,現在 Flink 中 Window 可以是時間驅動的(Time Window),也可以是數據驅動的(Count Window)。

下面的代碼是在 Flink 中使用 Window 的兩個示例:

從第一部分我們已經知道 Window 的一些基本概念,以及相關 API,下面我們以一個實際例子來看看怎麼使用 Window 相關的 API。

代碼來自 flink-examples:

上面的例子中我們首先會對每條數據進行時間抽取,然後進行 keyby,接著依次調用 window(),evictor(),trigger() 以及 maxBy()。下面我們重點來看 window(), evictor() 和 trigger() 這幾個方法。

2.1 WindowAssigner, Evictor 以及 Trigger


window() 方法接收的輸入是一個 WindowAssigner,WindowAssigner 負責將每條輸入的數據分發到正確的 window 中(一條數據可能同時分發到多個 Window 中),Flink 提供了幾種通用的 WindowAssigner:tumbling window(窗口間的元素無重複),sliding window(窗口間的元素可能重複),session window 以及 global window。如果需要自己定製數據分發策略,則可以實現一個 class,繼承自 WindowAssigner。

evictor() 主要用於做一些數據的自定義操作,可以在執行用戶代碼之前,也可以在執行用戶代碼之後,更詳細的描述可以參考 org.apache.flink.streaming

.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter 兩個方法。Flink 提供了如下三種通用的 evictor:

evictor() 是可選的方法,如果用戶不選擇,則默認沒有。

trigger() 用來判斷一個窗口是否需要被觸發,每個 WindowAssigner 都自帶一個默認的 trigger,如果默認的 trigger 不能滿足你的需求,則可以自定義一個類,繼承自 Trigger 即可,我們詳細描述下 Trigger 的接口以及含義:

onElement():每次往 window 增加一個元素的時候都會觸發

onEventTime():當 event-time timer 被觸發的時候會調用

onProcessingTime():當 processing-time timer 被觸發的時候會調用

onMerge():對兩個 trigger 的 state 進行 merge 操作

clear():window銷毀的時候被調用

上面的接口中前三個會返回一個 TriggerResult,TriggerResult 有如下幾種可能的選擇:


2.2 Time & Watermark


了解完上面的內容後,對於時間驅動的窗口,我們還有兩個概念需要澄清:Time 和 Watermark。

我們知道在分布式環境中 Time 是一個很重要的概念,在 Flink 中 Time 可以分為三種 Event-Time,Processing-Time 以及 Ingestion-Time,三者的關係我們可以從下圖中得知:

Event Time、Ingestion Time、Processing Time 區分:

在 Flink 中我們可以通過下面的方式進行 Time 類型的設置:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 

了解了 Time 之後,我們還需要知道 Watermark 相關的概念。

我們可以考慮一個這樣的例子:某 App 會記錄用戶的所有點擊行為,並回傳日誌(在網絡不好的情況下,先保存在本地,延後回傳)。A 用戶在 11:02 對 App 進行操作,B 用戶在 11:03 操作了 App,但是 A 用戶的網絡不太穩定,回傳日誌延遲了,導致我們在服務端先接受到 B 用戶 11:03 的消息,然後再接受到 A 用戶 11:02 的消息,消息亂序了。

那我們怎麼保證基於 event-time 的窗口在銷毀的時候,已經處理完了所有的數據呢?這就是 watermark 的功能所在。watermark 會攜帶一個單調遞增的時間戳 t,watermark(t) 表示所有時間戳不大於 t 的數據都已經到來了,未來小於等於 t 的數據不會再來,因此可以放心地觸發和銷毀窗口了。下圖中給了一個亂序數據流中的 watermark 例子:


2.3 遲到的數據


上面的 watermark 讓我們能夠應對亂序的數據,但是真實世界中我們沒法得到一個完美的 watermark 數值—要麼沒法獲取到,要麼耗費太大,因此實際工作中我們會使用近似 watermark —生成 watermark(t) 之後,還有較小的概率接受到時間戳t之前的數據,在 Flink 中將這些數據定義為「late elements」, 同樣我們可以在 window 中指定是允許延遲的最大時間(默認為 0),可以使用下面的代碼進行設置:

設置 allowedLateness 之後,遲來的數據同樣可以觸發窗口,進行輸出,利用Flink 的 side output 機制,我們可以獲取到這些遲到的數據,使用方式如下:

需要注意的是,設置了 allowedLateness 之後,遲到的數據也可能觸發窗口,對於  Session window 來說,可能會對窗口進行合併,產生預期外的行為。

在討論 Window 內部實現的時候,我們再通過下圖回顧一下 Window 的生命周期:


每條數據過來之後,會由 WindowAssigner 分配到對應的 Window,當 Window 被觸發之後,會交給 Evictor(如果沒有設置 Evictor 則跳過),然後處理 UserFunction。其中 WindowAssigner,Trigger,Evictor 我們都在上面討論過,而 UserFunction 則是用戶編寫的代碼。

整個流程還有一個問題需要討論:Window 中的狀態存儲。我們知道 Flink 是支持 Exactly Once 處理語義的,那麼 Window 中的狀態存儲和普通的狀態存儲又有什麼不一樣的地方呢?

首先給出具體的答案:從接口上可以認為沒有區別,但是每個 Window 會屬於不同的 namespace,而非 Window 場景下,則都屬於 VoidNamespace,最終由 State/Checkpoint 來保證數據的 Exactly Once 語義,下面我們從 org.apache.flink.streaming.runtime.operators.windowing.WindowOperator 摘取一段代碼進行闡述:

從上面我們可以知道,Window 中的的元素同樣是通過 state 進行維護,然後由 Checkpoint 機制保證 Exactly Once 語義。

至此,Time、Window 相關的所有內容都已經講解完畢,主要包括為什麼要有 Window;Window 中的三個核心組件:WindowAssigner、Trigger 和 Evictor;Window 中怎麼處理亂序數據,亂序數據是否允許延遲,以及怎麼處理遲到的數據;最後我們梳理整個 Window 的數據流程,以及 Window 中怎麼保證 Exactly Once 語義。

史上超強陣容,Flink Forward Asia 2019 你報名了嗎?

點擊圖片可查看 Flink Forward Asia 2019 詳情

相關焦點

  • Flink深度學習流處理核心組件 Time&Window 深度解析
    本文是 Apache Flink 零基礎入門系列文章的第五篇,重點為大家梳理一下 Flink 處理有限流的核心組件
  • 萬字詳解Flink極其重要的Time與Window(建議收藏)
    官方解釋:流式計算是一種被設計用於處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增長的本質上無限的數據集,而window是一種切割無限數據為有限塊進行處理的手段。所以Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的」buckets」桶,我們可以在這些桶上做計算操作。Window類型本文剛開始提到,劃分窗口就兩種方式:根據時間進行截取(time-driven-window),比如每1分鐘統計一次或每10分鐘統計一次。
  • Flink最難知識點再解析 | 時間/窗口/水印/遲到數據處理
    Flink支持根據事件時間處理,數據流中的每條數據都需要具有各自的時間戳,代表著數據的產生時間【事件時間】。在分布式系統中,數據流的採集通常都是有延遲的,可能是網絡原因啊,程序原因啊什麼的。所以當數據到達Flink程序中的時候,問題就來了,這些數據都要進行處理嗎?有可能其中一部分數據已經延遲了好幾個小時了,這對於實時性較強的業務場景是不能容忍的!
  • Apache Beam實戰指南 | 手把手教你玩轉KafkaIO與Flink
    更多優質內容請關注微信公眾號「AI 前線」(ID:ai-front) 關於 Apache Beam 實戰指南系列文章隨著大數據 2.0 時代悄然到來,大數據從簡單的批處理擴展到了實時處理、流處理、交互式查詢和機器學習應用。
  • Apache Flink 1.5.5 和 1.6.2 發布,通用數據處理平臺
    Apache Flink 是一個開源的流處理框架,應用於分布式、高性能、始終可用的、準確的數據流應用程式。Apache Flink 也是高效和分布式的通用數據處理平臺。Apache Flink 聲明式的數據分析開源系統,結合了分布式 MapReduce 類平臺的高效,靈活的編程和擴展性。同時在並行資料庫發現查詢優化方案。
  • Flink Window的5個使用小技巧
    今天分享5個Flink Window的使用小技巧,不過在開始之前,我們先複習幾個核心概念。Window有幾個核心組件:Assigner,負責確定待處理元素所屬的Window;Trigger,負責確定Window何時觸發計算;Evictor,可以用來「清理」Window中的元素;Function,負責處理窗口中的數據;Window是有狀態的,這個狀態和元素的Key以及Window綁定,我們可以抽象的理解為形式為
  • 每天5分鐘Flink -WordCount及Flink SQL
    java依賴:<dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-java</artifactId>    <version>${
  • Apache Flink 誤用之痛
    郵件列表:user@flink.apache.com/user-zh@flink.apache.orgStack Overflow:www.stackoverflow.com2.只有根據流處理的特性結合實際的業務去認真分析需求,才能將 Flink 技術進行恰當的運用。還需要注意,Flink 是流批統一的計算引擎,不是所有的業務都能用流處理或者都能用批處理來實現,需要分析自己的場景適合用哪種方式來實現。3.
  • 一篇文章讓深入理解Flink SQL 時間特性
    在定義 Schema 期間,可以111用.proctime,定義處理時間欄位。          注意,這個 proctime 屬性只能通過附加邏輯欄位,來擴展物理 schema。因此,只能在 schema 定義的末尾定義它。
  • 【Flink】小白級入門,Flink sql 的基礎用法
    Flink sql 是什麼❝sql 的誕生就是為了簡化我們對數據開發,可以使用少量的 sql 代碼,幫助我完成對數據的查詢,分析等功能❞聲明式 & 易於理解對於用戶只需要表達我想要什麼,具體處理邏輯交給框架,系統處理,用戶無需關心,對於一些非專業的開發人員有了解 sql,並且 sql 相對我們學習 java,c 等語言更簡單
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    Cluster 啟動流程深度分析之 Job Manager 啟動6、Flink 源碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Task Manager 啟動7、Flink 源碼解析 —— 分析 Batch WordCount 程序的執行過程8、Flink 源碼解析 —— 分析 Streaming WordCount
  • 數據說話:大數據處理引擎Spark與Flink比拼
    Flink 任務圖(來源:https://ci.apache.org/projects/flink/flink-docs-release-1.5/concepts/  Flink 中的狀態支持(來源:https://www.slideshare.net/ParisCarbone/state-management-in-apache-flink-consistent-stateful-distributed-stream-processing)  一般在流處理的時候會比較關注有狀態處理,但是仔細看的話批處理也是會受到影響的
  • Apache Flink 零基礎入門(四):客戶端操作的 5 種模式
    也可以在停止的時候顯示指定 Savepoint 目錄。Savepoint stored in file:/tmp/savepoint/savepoint-29da94-88299bacafb7. ➜ flink-1.7.2 ll /tmp/savepoint/savepoint-29da94-88299bacafb7total 32K-rw-r  取消和停止(流作業)的區別如下:• cancel(
  • Apache Flink 1.11 功能前瞻來啦
    [FLIP-111] docker 鏡像統一 之前 Flink 項目中提供了多個不同的 Dockerfile 用來創建 Flink 的 Docker 鏡像,現在他們被統一到了 apache/flink-docker  [1] 項目中。 5.
  • 盤點Flink實戰踩過的坑
    這個是因為動態表不是 append-only 模式的,需要用 toRetractStream ( 回撤流) 處理就好了.檢查flink程序有沒有數據傾斜,可以通過 flink 的 ui 界面查看每個分區子節點處理的數據量。
  • flink sql 知其所以然(十九):Table 與 DataStream 的轉轉轉(附源碼)
    但是 1.14 版本中流批任務的 env 都統一到了 StreamTableEnvironment 中,流批任務中就都可以進行互轉了。import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table
  • FlinkSQL窗口函數篇:易理解與實戰案例
    隨著時間的推移,最早流入實時計算的數據會被處理完成,之後流入的數據處於正在處理狀態。處於正在處理部分的和已處理部分的交界的時間戳,可以被定義為Watermark,代表在此之前的事件已經被處理完成並輸出。針對亂序的流,Watermark也至關重要,即使部分事件延遲到達,也不會影響窗口計算的正確性。
  • 乾貨 | Flink面試大全總結
    Flink的設計思想是以 流 為核心,批是流的特例,擅長處理 無界 和 有界 數據, Flink 提供 精確的時間控制能力 和 有狀態 計算機制,可以輕鬆應對無界數據流,同時 提供 窗口 處理有界數據流。所以被成為流批一體。
  • Flink Join類型介紹
    https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/temporal_table_function/SELECT o_amount, r_rateFROM Orders, LATERAL TABLE (Rates(o_proctime
  • Flink寫入hive測試
    >      <groupId>org.apache.flink</groupId>      <artifactId>flink-core</artifactId>      <version>${flink.version}</version>      <!