點擊上方關注【大數據左右手】
加技術吐槽群,獲取更多資料
前言本篇以通俗的語言來介紹FlinkSQL版的窗口函數。目的是從最容易理解與近實戰案例方式去讓讀者獲得收益。
本篇是FlinkSQL實戰的開篇,歡迎收藏,轉發與持續關注。
Watermark為什麼要引入由於實時計算的輸入數據是持續不斷的,因此我們需要一個有效的進度指標,來幫助我們確定關閉時間窗口的正確時間點,保證關閉窗口後不會再有數據進入該窗口,可以安全輸出這個窗口的聚合結果。
而Watermark就是一種衡量Event Time進展的有效機制。隨著時間的推移,最早流入實時計算的數據會被處理完成,之後流入的數據處於正在處理狀態。處於正在處理部分的和已處理部分的交界的時間戳,可以被定義為Watermark,代表在此之前的事件已經被處理完成並輸出。
針對亂序的流,Watermark也至關重要,即使部分事件延遲到達,也不會影響窗口計算的正確性。此外,並行數據流中,當算子(Operator)有多個輸入流時,算子的Event Time以最小流Event Time為準。
watermark策略Flink SQL提供了幾種常用的watermark策略。
嚴格意義上遞增的時間戳,發出到目前為止已觀察到的最大時間戳的水印。時間戳小於最大時間戳的行不會遲到。watermark for rowtime_column as rowtime_column
遞增的時間戳,發出到目前為止已觀察到的最大時間戳為負1的水印。時間戳等於或小於最大時間戳的行不會遲到。watermark for rowtime_column as rowtime_column - INTERVAL '1' SECOND.
有界時間戳(亂序)發出水印,它是觀察到的最大時間戳減去指定的延遲。watermark for rowtime_column as rowtime_column - INTERVAL'5'SECOND
是5秒的延遲水印策略。
watermark for rowtime_column as rowtime_column - INTERVAL 'string' timeUnit
語法格式樣例CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
窗口的劃分和觸發時機以通俗的語言達到你理解,這是主要目的。
窗口劃分源碼/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
計算邏輯window_start =
timestamp - (timestamp - offset + windowSize) % windowSize
window_end = window_start + windowSize
以左閉右開計算
[window_start,window_end)
介紹timestamp:進來的時間(event_time)
offset: 窗口啟動的偏移量
windowSize:設定的窗口大小
例:第一次進來的時間為
2021-11-06 20:13:00
按3分鐘為窗口大小,offset為0,所以:
window_start = 13-(13-0+3)%3 =12
所以這條數據落到
[2021-11-06 20:12:00 2021-11-06 20:15:00)
這個窗口內。
窗口觸發計算時機watermark(水位線,包含延遲) > 窗口結束時間
滾動窗口用法與場景滾動窗口定義滾動窗口(TUMBLE)將每個元素分配到一個指定大小的窗口中。通常,滾動窗口有一個固定的大小,並且不會出現重疊。例如,如果指定了一個5分鐘大小的滾動窗口,無限流的數據會根據時間劃分為[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。
語法TUMBLE函數用在GROUP BY子句中,用來定義滾動窗口。
TUMBLE(< time-attr>, < size-interval>) < size-interval>: INTERVAL 'string' timeUnit
或
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
data: 是一個表參數。
timecol: 是一個列描述符指示應該映射到哪個時間的屬性列。
size: 是一個持續時間指定窗口的寬度。
用法與場景用法窗口大小三分鐘,允許遲到一分鐘。
創建表val sql=
"""
|CREATE TABLE Bid(
|bidtime STRING,
|price DECIMAL(10, 2),
|item STRING,
|t as TO_TIMESTAMP(bidtime),
|WATERMARK FOR t AS t - INTERVAL '1' MINUTES
|)
|WITH (
| 'connector' = 'kafka',
| 'topic' = 'flink_sql_1',
| 'properties.group.id'='flink_sql_group_1',
| 'properties.bootstrap.servers' = 'xxxx',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(sql)
查詢val query=
"""
|SELECT window_start, window_end, SUM(price) price FROM TABLE(
| TUMBLE(
| DATA => TABLE Bid,
| TIMECOL => DESCRIPTOR(t),
| SIZE => INTERVAL '3' MINUTES))
| GROUP BY window_start, window_end
|""".stripMargin
tableEnv.executeSql(query).print()
}
測試數據{"bidtime":"2021-11-04 19:05:00.0","price":4,"item":"A" }
{"bidtime":"2021-11-04 19:07:00.0","price":4,"item":"C" }
{"bidtime":"2021-11-04 19:09:00.0","price":4,"item":"B" }
{"bidtime":"2021-11-04 19:11:00.0","price":4,"item":"D" }
{"bidtime":"2021-11-04 19:13:00.0","price":4,"item":"F" }
{"bidtime":"2021-11-04 19:15:00.0","price":4,"item":"E" }
{"bidtime":"2021-11-04 19:17:00.0","price":4,"item":"E" }
{"bidtime":"2021-11-04 19:19:00.0","price":4,"item":"E" }
{"bidtime":"2021-11-04 19:21:00.0","price":4,"item":"E" }
結果+----+++-+
| op | window_start | window_end | price|
+----+++-+
| +I | 2021-11-04 19:03:00.000 | 2021-11-04 19:06:00.000 | 4.00|
| +I | 2021-11-04 19:06:00.000 | 2021-11-04 19:09:00.000 | 4.00|
| +I | 2021-11-04 19:09:00.000 | 2021-11-04 19:12:00.000 | 8.00|
| +I | 2021-11-04 19:12:00.000 | 2021-11-04 19:15:00.000 | 4.00|
| +I | 2021-11-04 19:15:00.000 | 2021-11-04 19:18:00.000 | 8.00|
.
以方便看的時間格式展示(hh:mm)事件時間水位線窗口觸發計算的窗口19:0519:04[19:03,19:06)
19:0719:06[19:06,19:09)[19:03,19:06)19:0919:08[19:09,19:12)
19:1119:10[19:09,19:12)[19:06,19:09)19:1319:12[19:12,19:15)[19:09,19:12)19:1519:14[19:15,19:18)
19:1719:16[19:15,19:18)[19:12,19:15)19:1919:18[19:18,19:21)[19:15,19:18)19:2119:20[19:18,19:21)
場景或實戰使用場景分鐘級別聚合常用場景。比如:統計每個用戶每分鐘在指定網站的單擊數
實戰創建結構val kafka_sql=
"""
|CREATE TABLE user_clicks (
| user_name VARCHAR ,
| click_url VARCHAR ,
| update_time BIGINT,
| t as TO_TIMESTAMP(FROM_UNIXTIME(update_time/1000,'yyyy-MM-dd HH:mm:ss')),
| WATERMARK FOR t AS t - INTERVAL '2' SECOND
|) WITH (
| 'connector' = 'kafka',
| ''topic' = 'flink_sql_1',
| 'properties.group.id'='flink_sql_group_1',
| 'properties.bootstrap.servers' = 'xxxx',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(kafka_sql)
查詢邏輯val query=
"""
| SELECT
| user_name,
| count(click_url) as pv,
| TUMBLE_START(t, INTERVAL '1' MINUTE) as t_start,
| TUMBLE_END(t, INTERVAL '1' MINUTE) as t_end
| FROM user_clicks
| GROUP BY TUMBLE(t, INTERVAL '1' MINUTE), user_name
|""".stripMargin
tableEnv.executeSql(query).print()
數據與結果{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:00.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:10.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:49.0"
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:01:05.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:01:58.0"}
{"user_name":"bo","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:02:11.0"}
+----+--+--+++
| op | user_name | pv | window_start | window_end |
+----+--+--+++
| +I | wang | 3 | 2021-11-06 10:00:00.000 | 2021-11-06 10:01:00.000 |
| +I | wang | 2 | 2021-11-06 10:01:00.000 | 2021-11-06 10:02:00.000 |
滑動窗口用法與場景滾動窗口定義滑動窗口(HOP),也被稱作Sliding Window。不同於滾動窗口,滑動窗口的窗口可以重疊。
通常,大部分元素符合多個窗口情景,窗口是重疊的。因此,滑動窗口在計算移動平均數(moving averages)時很實用。例如,計算過去5分鐘數據的平均值,每10秒鐘更新一次,可以設置slide為10秒,size為5分鐘。下圖為您展示間隔為30秒,窗口大小為1分鐘的滑動窗口。
語法HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])data: 是一個表參數,數據表。
timecol: 是一個列描述符指示應該映射到哪個時間的屬性列。
slide: 滑動時間。
size: 是一個持續時間指定窗口的寬度。
slide < size,則窗口會重疊,每個元素會被分配到多個窗口。slide = size,則等同於滾動窗口(TUMBLE)。slide > size,則為跳躍窗口,窗口之間不重疊且有間隙。具體用法可以參考滾動窗口,以下介紹下實戰場景或實戰使用場景計算過去5分鐘數據的平均值,每10秒鐘更新一次,可以設置slide為10秒,size為5分鐘。類似這種的都可以去使用滑動窗口。
實戰統計每個用戶過去1分鐘的單擊次數,每30秒更新1次,即1分鐘的窗口,30秒滑動1次。
創建結構val sql=
"""
|CREATE TABLE user_clicks (
| user_name VARCHAR ,
| click_url VARCHAR ,
| ts TIMESTAMP(3),
| WATERMARK FOR ts AS ts - INTERVAL '2' SECOND
|)
|WITH (
| 'connector' = 'kafka',
| 'topic' = 'flink_sql_1',
| 'properties.group.id'='flink_sql_group_1',
| 'properties.bootstrap.servers' = 'devcdh1:9092,devcdh2:9092,devcdh3:9092',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(sql)
查詢邏輯val query=
"""
|SELECT
| user_name,
| count(click_url) as pv,
| HOP_START (ts, INTERVAL '30' SECOND,INTERVAL '1' MINUTE) as window_start,
| HOP_END (ts,INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end
| FROM user_clicks
| GROUP BY HOP(ts,INTERVAL '30' SECOND, INTERVAL '1' MINUTE), user_name
|""".stripMargin
tableEnv.executeSql(query).print()
數據與結果{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:00.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:10.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:49.0"
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:01:05.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:01:58.0"}
{"user_name":"bo","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:02:11.0"}
+----+--+--+++
| op | user_name | pv | window_start | window_end |
+----+--+--+++
| +I | wang | 2 | 2021-11-06 09:59:30.000 | 2021-11-06 10:00:30.000 |
| +I | wang | 3 | 2021-11-06 10:00:00.000 | 2021-11-06 10:01:00.000 |
| +I | wang | 2 | 2021-11-06 10:00:30.000 | 2021-11-06 10:01:30.000 |
| +I | wang | 2 | 2021-11-06 10:01:00.000 | 2021-11-06 10:02:00.000 |
會話窗口用法與場景會話窗口定義通過SESSION活動來對元素進行分組。會話窗口與滾動窗口和滑動窗口相比,沒有窗口重疊,沒有固定窗口大小。相反,當它在一個固定的時間周期內不再收到元素,即會話斷開時,該窗口就會關閉。
會話窗口通過一個間隔時間(Gap)來配置,這個間隔定義了非活躍周期的長度。
語法SESSION(<time-attr>, <gap-interval>)
<gap-interval>: INTERVAL 'string' timeUnit
場景或實戰使用場景例如,一個表示滑鼠單擊活動的數據流可能具有長時間的空閒時間,並在兩段空閒之間散布著高濃度的單擊。如果數據在指定的間隔(Gap)之後到達,則會開始一個新的窗口。
實戰統計每個用戶在每個活躍會話期間的單擊次數,會話超時時長為30秒。
查詢邏輯創建表和上面一樣,不再羅列
SELECT
user_name,
count(click_url) as pv,
SESSION_START (ts, INTERVAL '30' SECOND) as window_start,
SESSION_END (ts,INTERVAL '30' SECOND) as window_end
FROM user_clicks
GROUP BY SESSION(ts,INTERVAL '30' SECOND), user_name
結果+----+--+--+++
| op | user_name | pv | window_start | window_end |
+----+--+--+++
| +I | wang | 2 | 2021-11-06 10:00:00.000 | 2021-11-06 10:00:40.000 |
| +I | wang | 2 | 2021-11-06 10:00:49.000 | 2021-11-06 10:01:35.000 |
溫馨提醒FlinkSQL window函數與connector kafka結合計算,在本地測試或者在伺服器上運行的時候設置了並行度為1,如果遇到在自己理解上,並沒有數據print()或者少數據的時候,你考慮下kafka分區的關係。
因為目前FlinkSQL是不支持source/sink並行度配置的,FlinkSQL中各算子並行度默認是根據source的partition數或文件數來決定的,比如常用的kafka source topic的partition是3,那麼FlinkSQL任務的並發就是3。
又因為每個task維護單獨的watermark。雖然你在全局設置了並行度為1,認為全部數據進入一個task,在某個時刻應該觸發了計算,然而實際情況並沒有觸發計算的,那在這個時候你就要考慮kafka分區帶來的影響。
tEnv.getConfig()
.addConfiguration(
new Configuration()
.set(CoreOptions.DEFAULT_PARALLELISM, 1)
);
其他函數Window Aggregation
Group Aggregation
Over Aggregation
可參考往期
FlinkSQL窗口,讓你眼前一亮,是否可以大吃一驚呢
「注:」 本篇案例參考以下
實時計算Flink版:
https://help.aliyun.com/document_detail/62510.html
Flink官網:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/
關注回復關鍵字「加群」你可以提各種技術、產品、管理、認知等問題。群中特色:每天早上安排一個話題討論,對學習和後期找工作有大幫助,討論結果整理在線文檔隨後發出。在群力群策中形成知識庫。動動小手,讓更多需要的人看到~