Flink窗口的開始時間是什麼?

2022-01-30 好胖子的大數據之路

面試官:Flink窗口大小怎麼確定的,開始時間是什麼時候,結束是什麼時候?


這篇文章會很短,但是我認為很有意義,今天晚上微醺了,隨便寫一下,如有不對望不吝指正。


我還記得的在我剛學習flink的時候,B站的老師說過,Flink窗口的開始時間和結束時間和你想的不一樣。那個時候我好像記得老師說過,flink的窗口大小會根據你的時間單位來進行修正。

然後在現如今,很多人還是不是很了解窗口機制,以及watermark。更別提什麼窗口什麼時候,什麼時候結束。所以呢,今天從源碼角度給大家普及一下窗口什麼時候開始,什麼時候結束。

我們可以來編寫一個簡單的代碼,來看一下效果,我習慣用java來寫flink,所以也就使用java了。

  @Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
     //獲取窗口開始時間
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}

  public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
   //窗口開始計算的時間
return timestamp - (timestamp - offset + windowSize) % windowSize;
}

我們可以看出來窗口開始時間, 是取模過後的時間,我們來簡單的分析一番。

假如我們第一條數據的時間戳是1000,offset暫時不需要管,因為他是時間的偏移量例如,東八區什麼的。我們假如窗口大小是5s,

那麼接下來的公式計算也就是 1000 - (1000 - 0 + 5000)% 5000,那麼我們可以計算出來的結果就是0,也就是說,窗口開始的時間是0.更大的時間窗口大小,各位大佬可以下面自己算一下。

也就是說開始時間是 0,結束的時間窗口也就是4999,因為到5000的時候就觸發計算了。那麼我們接下來就進行驗證一番和我們分析的是否一致。

接下來我們寫一個簡單的wordcount,因為在多並行度下,不是很好分析,我們設置為單並行讀。如果有對watermark還不是很理解的大佬,可以看我的這篇文章,https://blog.csdn.net/weixin_43704599/article/details/117411252

/**
* @author :qingzhi.wu
* @date :2022/1/18 8:36 下午
*/
public class WindowSizeTest {
   public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
       DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

       SingleOutputStreamOperator<String> source1 = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.of(0, TimeUnit.MILLISECONDS)) {
           @Override
           public long extractTimestamp(String s) {

               return Long.parseLong(s.split(",")[0]);
          }
      });

       SingleOutputStreamOperator<Tuple2<String, Integer>> map = source1.map(new MapFunction<String, Tuple2<String, Integer>>() {
           @Override
           public Tuple2<String, Integer> map(String s) throws Exception {
               return Tuple2.of(s.split(",")[1],1);
          }
      });

       WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = map.keyBy(0).window(TumblingEventTimeWindows.of(Time.of(5000, TimeUnit.MILLISECONDS)));
       window.sum(1).print();
       env.execute();
  }
}


接下來我們,就看看我們分析的是否正確

很明顯是正確的。那麼一天的窗口大小你會計算嗎?


相關焦點

  • Flink最難知識點再解析 | 時間/窗口/水印/遲到數據處理
    這時候水印就應運而生了,水印的目的就是為了解決亂序的數據問題,可以在時間窗口內根據事件時間來進行業務處理,對於亂序的有延遲的數據可以在一定時間範圍內進行等待,那這個時間範圍是怎麼計算的呢?數據在源源不斷的進入flink,我們設置好window的大小為5s,flink會以5s來將每分鐘劃分為連續的多個窗口。
  • FlinkSQL窗口函數篇:易理解與實戰案例
    Watermark為什麼要引入由於實時計算的輸入數據是持續不斷的,因此我們需要一個有效的進度指標,來幫助我們確定關閉時間窗口的正確時間點,保證關閉窗口後不會再有數據進入該窗口,可以安全輸出這個窗口的聚合結果。而Watermark就是一種衡量Event Time進展的有效機制。
  • Structured Streaming與Flink比較
    時間概念三種處理時間:事件時間,注入時間,處理時間。對於基於事件時間的處理flink和Structured Streaming都是支持watemark機制,窗口操作基於watermark和事件時間可以對滯後事件做相應的處理,雖然聽起來這是個好事,但是整體來說watermark就是雞肋,它會導致結果數據輸出滯後,比如watermark是一個小時,窗口一個小時,那麼數據輸出實際上會延遲兩個小時,這個時候需要進行一些處理。
  • 每天5分鐘Flink -WordCount及Flink SQL
    flink.version}</version>  <!大致處理的流程如上所示,現在來一步一步實現這個案例。先開始創建 Flink 的運行環境:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();然後指定了數據 Source 源,以及 Source 源的一些配置:
  • 一篇文章讓深入理解Flink SQL 時間特性
    前言         基於時間的操作(比如 Table API 和 SQL 中窗口操作),需要定義相關的時間語義和時間數據來源的信息。所以,Table 可以提供一個邏輯上的時間欄位,用於在表處理程序中,指示時間和訪問相應的時間戳。
  • 盤點Flink實戰踩過的坑
    數據傾斜導致子任務積壓業務背景一個流程中,有兩個重要子任務:一是數據遷移,將kafka實時數據落Es,二是將kafka數據做窗口聚合落hbase,兩個子任務接的是同一個Topic GroupId。上遊 Topic 的 tps 高峰達到5-6w。
  • 【Flink】小白級入門,Flink sql 的基礎用法
    Flink sql 是什麼❝sql 的誕生就是為了簡化我們對數據開發,可以使用少量的 sql 代碼,幫助我完成對數據的查詢,分析等功能❞聲明式 & 易於理解對於用戶只需要表達我想要什麼,具體處理邏輯交給框架,系統處理,用戶無需關心,對於一些非專業的開發人員有了解 sql,並且 sql 相對我們學習 java,c 等語言更簡單
  • 萬字詳解Flink極其重要的Time與Window(建議收藏)
    Window類型本文剛開始提到,劃分窗口就兩種方式:根據時間進行截取(time-driven-window),比如每1分鐘統計一次或每10分鐘統計一次。根據數據進行截取(data-driven-window),比如每5個數據統計一次或每50個數據統計一次。
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    2、flink中watermark究竟是如何生成的,生成的規則是什麼,怎麼用來處理亂序數據3、消費kafka數據的時候,如果遇到了髒數據,或者是不符合規則的數據等等怎麼處理呢?4、在Kafka 集群中怎麼指定讀取/寫入數據到指定broker或從指定broker的offset開始消費?5、Flink能通過oozie或者azkaban提交嗎?
  • 乾貨 | Flink面試大全總結
    -基於時間的滾動窗口Flink中還支持一個特殊的窗口:會話窗口SessionWindowssession窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況
  • Flink寫入hive測試
    序言:前段時間測試讀取hive數據,今天測試寫入hive一、插件版本 flink 1.11.1  hive 2.3.6
  • Apache Flink 流處理核心組件 Time&Window 深度解析
    上面的例子中我們首先會對每條數據進行時間抽取,然後進行 keyby,接著依次調用 window(),evictor(),trigger() 以及 maxBy()。),sliding window(窗口間的元素可能重複),session window 以及 global window。
  • Flink 窗口之Window機制
    Flink 的 API 在數據流上有非常靈活的窗口定義,使其能在其他開源流處理器中脫穎而出。在這篇文章中,我們主要討論用於流處理的窗口的概念,介紹 Flink 的內置窗口,並說明其對自定義窗口語義的支持。1. 什麼是窗口?它們有什麼用?我們拿交通傳感器的例子來說明,傳感器每15秒統計通過某個位置的車輛數量。結果流看起來像如下所示:
  • Apache Flink 誤用之痛
    項目開始在開始開發前,我們需要選擇正確的切入方式,以下幾種往往是最糟糕的開始:a) 從一個具有挑戰性的用例開始(端對端的 Exactly-once、大狀態、複雜的業務邏輯、強實時SLA的組合) b) 之前沒有流處理經驗 c) 不對團隊做相關的培訓 d) 不利用社區
  • Flink深度學習流處理核心組件 Time&Window 深度解析
    >clear() window 銷毀的時候被調用上面的接口中前三個會返回一個 TriggerResult,TriggerResult 有如下幾種可能的選擇:2.2 Time & Watermark了解完上面的內容後,對於時間驅動的窗口
  • 數據說話:大數據處理引擎Spark與Flink比拼
    Flink 任務圖(來源:https://ci.apache.org/projects/flink/flink-docs-release-1.5/concepts/比如常見的窗口聚合,如果批處理的數據時間段比窗口大,是可以不考慮狀態的,用戶邏輯經常會忽略這個問題。但是當批處理時間段變得比窗口小的時候,一個批的結果實際上依賴於以前處理過的批。這時,因為批處理引擎一般沒有這個需求不會有很好的內置支持,維護狀態就成為了用戶需要解決的事情。比如窗口聚合的情況用戶就要加一個中間結果表記住還沒有完成的窗口的結果。這樣當用戶把批處理時間段變短的時候就會發現邏輯變複雜了。
  • Flink + Debezium CDC 實現原理及代碼實戰
    在介紹 Debezium 之前,我們要先了解一下什麼是 Kafka Connect。二、Kafka Connect 介紹Kafka 相信大家都很熟悉,是一款分布式,高性能的消息隊列框架。一般情況下,讀寫 Kafka 數據,都是用 Consumer 和 Producer  Api 來完成,但是自己實現這些需要去考慮很多額外的東西,比如管理 Schema,容錯,並行化,數據延遲,監控等等問題。
  • Flink Join類型介紹
    https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/temporal_table_function/SELECT o_amount, r_rateFROM Orders, LATERAL TABLE (Rates(o_proctime
  • flink-1.12.0 upsert-kafka connector demo
    參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh
  • Flink保證端到端exactly-once語義(也適用於kafka)
    flink的checkpoint包含了,flink應用現在的狀態與數據輸入流的位置(對於kafka來說就是offset) checkpoint可異步的持久化到像s3或者hdfs這樣子的存儲系統裡面。如果flink應用失敗或者升級時,可以拉取checkpoint中的狀態來恢復上次失敗時的數據。