面試官:Flink窗口大小怎麼確定的,開始時間是什麼時候,結束是什麼時候?
這篇文章會很短,但是我認為很有意義,今天晚上微醺了,隨便寫一下,如有不對望不吝指正。
我還記得的在我剛學習flink的時候,B站的老師說過,Flink窗口的開始時間和結束時間和你想的不一樣。那個時候我好像記得老師說過,flink的窗口大小會根據你的時間單位來進行修正。
然後在現如今,很多人還是不是很了解窗口機制,以及watermark。更別提什麼窗口什麼時候,什麼時候結束。所以呢,今天從源碼角度給大家普及一下窗口什麼時候開始,什麼時候結束。
我們可以來編寫一個簡單的代碼,來看一下效果,我習慣用java來寫flink,所以也就使用java了。
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
//獲取窗口開始時間
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
} public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
//窗口開始計算的時間
return timestamp - (timestamp - offset + windowSize) % windowSize;
}我們可以看出來窗口開始時間, 是取模過後的時間,我們來簡單的分析一番。
假如我們第一條數據的時間戳是1000,offset暫時不需要管,因為他是時間的偏移量例如,東八區什麼的。我們假如窗口大小是5s,
那麼接下來的公式計算也就是 1000 - (1000 - 0 + 5000)% 5000,那麼我們可以計算出來的結果就是0,也就是說,窗口開始的時間是0.更大的時間窗口大小,各位大佬可以下面自己算一下。
也就是說開始時間是 0,結束的時間窗口也就是4999,因為到5000的時候就觸發計算了。那麼我們接下來就進行驗證一番和我們分析的是否一致。
接下來我們寫一個簡單的wordcount,因為在多並行度下,不是很好分析,我們設置為單並行讀。如果有對watermark還不是很理解的大佬,可以看我的這篇文章,https://blog.csdn.net/weixin_43704599/article/details/117411252
/**
* @author :qingzhi.wu
* @date :2022/1/18 8:36 下午
*/
public class WindowSizeTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> source1 = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.of(0, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(String s) {
return Long.parseLong(s.split(",")[0]);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> map = source1.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s.split(",")[1],1);
}
});
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = map.keyBy(0).window(TumblingEventTimeWindows.of(Time.of(5000, TimeUnit.MILLISECONDS)));
window.sum(1).print();
env.execute();
}
}
接下來我們,就看看我們分析的是否正確
很明顯是正確的。那麼一天的窗口大小你會計算嗎?