FlinkSQL窗口函數篇:易理解與實戰案例

2022-01-07 大數據羊說

點擊上方關注【大數據左右手

加技術吐槽群,獲取更多資料

前言

本篇以通俗的語言來介紹FlinkSQL版的窗口函數。目的是從最容易理解與近實戰案例方式去讓讀者獲得收益。

本篇是FlinkSQL實戰的開篇,歡迎收藏,轉發與持續關注

Watermark為什麼要引入

由於實時計算的輸入數據是持續不斷的,因此我們需要一個有效的進度指標,來幫助我們確定關閉時間窗口的正確時間點,保證關閉窗口後不會再有數據進入該窗口,可以安全輸出這個窗口的聚合結果。

而Watermark就是一種衡量Event Time進展的有效機制。隨著時間的推移,最早流入實時計算的數據會被處理完成,之後流入的數據處於正在處理狀態。處於正在處理部分的和已處理部分的交界的時間戳,可以被定義為Watermark,代表在此之前的事件已經被處理完成並輸出。

針對亂序的流,Watermark也至關重要,即使部分事件延遲到達,也不會影響窗口計算的正確性。此外,並行數據流中,當算子(Operator)有多個輸入流時,算子的Event Time以最小流Event Time為準。

watermark策略

Flink SQL提供了幾種常用的watermark策略。

嚴格意義上遞增的時間戳,發出到目前為止已觀察到的最大時間戳的水印。時間戳小於最大時間戳的行不會遲到。
watermark for rowtime_column as rowtime_column

遞增的時間戳,發出到目前為止已觀察到的最大時間戳為負1的水印。時間戳等於或小於最大時間戳的行不會遲到。
watermark for rowtime_column as rowtime_column - INTERVAL '1' SECOND.

有界時間戳(亂序)發出水印,它是觀察到的最大時間戳減去指定的延遲。
watermark for rowtime_column as rowtime_column - INTERVAL'5'SECOND

是5秒的延遲水印策略。

watermark for rowtime_column as rowtime_column - INTERVAL 'string' timeUnit

語法格式樣例
CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

窗口的劃分和觸發時機

以通俗的語言達到你理解,這是主要目的。

窗口劃分源碼
/**
  * Method to get the window start for a timestamp.
  *
  * @param timestamp epoch millisecond to get the window start.
  * @param offset The offset which window start would be shifted by.
  * @param windowSize The size of the generated windows.
  * @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
 return timestamp - (timestamp - offset + windowSize) % windowSize;
}

計算邏輯
window_start = 
   timestamp - (timestamp - offset + windowSize) % windowSize

window_end = window_start + windowSize

以左閉右開計算

[window_start,window_end)

介紹

timestamp:進來的時間(event_time)

offset: 窗口啟動的偏移量

windowSize:設定的窗口大小

例:第一次進來的時間為 

2021-11-06 20:13:00

按3分鐘為窗口大小,offset為0,所以:

window_start = 13-(13-0+3)%3 =12


所以這條數據落到 

[2021-11-06 20:12:00 2021-11-06 20:15:00)

這個窗口內。

窗口觸發計算時機

watermark(水位線,包含延遲) > 窗口結束時間

滾動窗口用法與場景滾動窗口定義

滾動窗口(TUMBLE)將每個元素分配到一個指定大小的窗口中。通常,滾動窗口有一個固定的大小,並且不會出現重疊。例如,如果指定了一個5分鐘大小的滾動窗口,無限流的數據會根據時間劃分為[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。

語法

TUMBLE函數用在GROUP BY子句中,用來定義滾動窗口。

TUMBLE(< time-attr>, < size-interval>) < size-interval>: INTERVAL 'string' timeUnit



TUMBLE(TABLE data, DESCRIPTOR(timecol), size)

data: 是一個表參數。
timecol: 是一個列描述符指示應該映射到哪個時間的屬性列。
size: 是一個持續時間指定窗口的寬度。

用法與場景用法

窗口大小三分鐘,允許遲到一分鐘。

創建表
    val sql=
      """
        |CREATE TABLE Bid(
        |bidtime STRING,
        |price DECIMAL(10, 2),
        |item STRING,
        |t as TO_TIMESTAMP(bidtime),
        |WATERMARK  FOR t AS t - INTERVAL '1' MINUTES
        |)
        |WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'flink_sql_1',
        |  'properties.group.id'='flink_sql_group_1',
        |  'properties.bootstrap.servers' = 'xxxx',
        |  'format' = 'json'
        |)
        |""".stripMargin
    tableEnv.executeSql(sql)

查詢
    val query=
      """
        |SELECT window_start, window_end, SUM(price) price FROM TABLE(
        |   TUMBLE(
        |     DATA => TABLE Bid,
        |     TIMECOL => DESCRIPTOR(t),
        |     SIZE => INTERVAL '3' MINUTES))
        | GROUP BY window_start, window_end
        |""".stripMargin
    tableEnv.executeSql(query).print()
  }

測試數據
{"bidtime":"2021-11-04 19:05:00.0","price":4,"item":"A" }
{"bidtime":"2021-11-04 19:07:00.0","price":4,"item":"C" }
{"bidtime":"2021-11-04 19:09:00.0","price":4,"item":"B" }
{"bidtime":"2021-11-04 19:11:00.0","price":4,"item":"D" }
{"bidtime":"2021-11-04 19:13:00.0","price":4,"item":"F" }
{"bidtime":"2021-11-04 19:15:00.0","price":4,"item":"E" }
{"bidtime":"2021-11-04 19:17:00.0","price":4,"item":"E" }
{"bidtime":"2021-11-04 19:19:00.0","price":4,"item":"E" }
{"bidtime":"2021-11-04 19:21:00.0","price":4,"item":"E" }

結果
+----+++-+
| op |            window_start |              window_end | price|
+----+++-+
| +I | 2021-11-04 19:03:00.000 | 2021-11-04 19:06:00.000 |  4.00|
| +I | 2021-11-04 19:06:00.000 | 2021-11-04 19:09:00.000 |  4.00|
| +I | 2021-11-04 19:09:00.000 | 2021-11-04 19:12:00.000 |  8.00|
| +I | 2021-11-04 19:12:00.000 | 2021-11-04 19:15:00.000 |  4.00|
| +I | 2021-11-04 19:15:00.000 | 2021-11-04 19:18:00.000 |  8.00|
.

以方便看的時間格式展示(hh:mm)事件時間水位線窗口觸發計算的窗口19:0519:04[19:03,19:06)
19:0719:06[19:06,19:09)[19:03,19:06)19:0919:08[19:09,19:12)
19:1119:10[19:09,19:12)[19:06,19:09)19:1319:12[19:12,19:15)[19:09,19:12)19:1519:14[19:15,19:18)
19:1719:16[19:15,19:18)[19:12,19:15)19:1919:18[19:18,19:21)[19:15,19:18)19:2119:20[19:18,19:21)
場景或實戰使用場景

分鐘級別聚合常用場景。比如:統計每個用戶每分鐘在指定網站的單擊數

實戰創建結構
 val kafka_sql=
        """
          |CREATE TABLE user_clicks (
          |  user_name  VARCHAR ,
          |  click_url VARCHAR ,
          |  update_time  BIGINT,
          |  t as TO_TIMESTAMP(FROM_UNIXTIME(update_time/1000,'yyyy-MM-dd HH:mm:ss')),
          |  WATERMARK FOR t AS t - INTERVAL '2' SECOND
          |) WITH (
          |  'connector' = 'kafka',
          |  ''topic' = 'flink_sql_1',
          |  'properties.group.id'='flink_sql_group_1',
          |  'properties.bootstrap.servers' = 'xxxx',
          |  'format' = 'json'
          |)
          |""".stripMargin
    tableEnv.executeSql(kafka_sql)

查詢邏輯
 val query=
        """
          | SELECT
          | user_name,
          | count(click_url) as pv,
          | TUMBLE_START(t, INTERVAL '1' MINUTE) as t_start,
          | TUMBLE_END(t, INTERVAL '1' MINUTE) as t_end
          | FROM user_clicks
          | GROUP BY TUMBLE(t, INTERVAL '1' MINUTE), user_name
          |""".stripMargin
    tableEnv.executeSql(query).print()

數據與結果
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:00.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:10.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:49.0"
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:01:05.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:01:58.0"}
{"user_name":"bo","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:02:11.0"}

+----+--+--+++
| op |                      user_name |                   pv |            window_start |              window_end |
+----+--+--+++
| +I |                           wang |                    3 | 2021-11-06 10:00:00.000 | 2021-11-06 10:01:00.000 |
| +I |                           wang |                    2 | 2021-11-06 10:01:00.000 | 2021-11-06 10:02:00.000 |

滑動窗口用法與場景滾動窗口定義

滑動窗口(HOP),也被稱作Sliding Window。不同於滾動窗口,滑動窗口的窗口可以重疊。

通常,大部分元素符合多個窗口情景,窗口是重疊的。因此,滑動窗口在計算移動平均數(moving averages)時很實用。例如,計算過去5分鐘數據的平均值,每10秒鐘更新一次,可以設置slide為10秒,size為5分鐘。下圖為您展示間隔為30秒,窗口大小為1分鐘的滑動窗口。

語法
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

data: 是一個表參數,數據表。

timecol: 是一個列描述符指示應該映射到哪個時間的屬性列。

slide: 滑動時間。

size: 是一個持續時間指定窗口的寬度。

slide < size,則窗口會重疊,每個元素會被分配到多個窗口。slide = size,則等同於滾動窗口(TUMBLE)。slide > size,則為跳躍窗口,窗口之間不重疊且有間隙。具體用法可以參考滾動窗口,以下介紹下實戰場景或實戰使用場景

計算過去5分鐘數據的平均值,每10秒鐘更新一次,可以設置slide為10秒,size為5分鐘。類似這種的都可以去使用滑動窗口。

實戰

統計每個用戶過去1分鐘的單擊次數,每30秒更新1次,即1分鐘的窗口,30秒滑動1次。

創建結構
 val sql=
      """
        |CREATE TABLE user_clicks (
        |  user_name  VARCHAR ,
        |  click_url VARCHAR ,
        |  ts TIMESTAMP(3),
        |  WATERMARK FOR ts AS ts - INTERVAL '2' SECOND
        |)
        |WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'flink_sql_1',
        |  'properties.group.id'='flink_sql_group_1',
        |  'properties.bootstrap.servers' = 'devcdh1:9092,devcdh2:9092,devcdh3:9092',
        |  'format' = 'json'
        |)
        |""".stripMargin
    tableEnv.executeSql(sql)

查詢邏輯
val query=
      """
        |SELECT
        | user_name,
        | count(click_url) as pv,
        | HOP_START (ts, INTERVAL '30' SECOND,INTERVAL '1' MINUTE) as window_start,
        | HOP_END (ts,INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end
        | FROM user_clicks
        | GROUP BY HOP(ts,INTERVAL '30' SECOND, INTERVAL '1' MINUTE), user_name
        |""".stripMargin
    tableEnv.executeSql(query).print()

數據與結果
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:00.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:10.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:00:49.0"
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:01:05.0"}
{"user_name":"wang","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:01:58.0"}
{"user_name":"bo","click_url":"http://weixin.qq.com","ts":"2021-11-06 10:02:11.0"}

+----+--+--+++
| op |                      user_name |                   pv |            window_start |              window_end |
+----+--+--+++
| +I |                           wang |                    2 | 2021-11-06 09:59:30.000 | 2021-11-06 10:00:30.000 |
| +I |                           wang |                    3 | 2021-11-06 10:00:00.000 | 2021-11-06 10:01:00.000 |
| +I |                           wang |                    2 | 2021-11-06 10:00:30.000 | 2021-11-06 10:01:30.000 |
| +I |                           wang |                    2 | 2021-11-06 10:01:00.000 | 2021-11-06 10:02:00.000 |

會話窗口用法與場景會話窗口定義

通過SESSION活動來對元素進行分組。會話窗口與滾動窗口和滑動窗口相比,沒有窗口重疊,沒有固定窗口大小。相反,當它在一個固定的時間周期內不再收到元素,即會話斷開時,該窗口就會關閉。

會話窗口通過一個間隔時間(Gap)來配置,這個間隔定義了非活躍周期的長度。

語法
SESSION(<time-attr>, <gap-interval>)
<gap-interval>: INTERVAL 'string' timeUnit

場景或實戰使用場景

例如,一個表示滑鼠單擊活動的數據流可能具有長時間的空閒時間,並在兩段空閒之間散布著高濃度的單擊。如果數據在指定的間隔(Gap)之後到達,則會開始一個新的窗口。

實戰

統計每個用戶在每個活躍會話期間的單擊次數,會話超時時長為30秒。

查詢邏輯

創建表和上面一樣,不再羅列

SELECT
 user_name,
 count(click_url) as pv,
 SESSION_START (ts, INTERVAL '30' SECOND) as window_start,
 SESSION_END (ts,INTERVAL '30' SECOND) as window_end
 FROM user_clicks
 GROUP BY SESSION(ts,INTERVAL '30' SECOND), user_name

結果
+----+--+--+++
| op |                      user_name |                   pv |            window_start |              window_end |
+----+--+--+++
| +I |                           wang |                    2 | 2021-11-06 10:00:00.000 | 2021-11-06 10:00:40.000 |
| +I |                           wang |                    2 | 2021-11-06 10:00:49.000 | 2021-11-06 10:01:35.000 |

溫馨提醒

FlinkSQL window函數與connector kafka結合計算,在本地測試或者在伺服器上運行的時候設置了並行度為1,如果遇到在自己理解上,並沒有數據print()或者少數據的時候,你考慮下kafka分區的關係。

因為目前FlinkSQL是不支持source/sink並行度配置的,FlinkSQL中各算子並行度默認是根據source的partition數或文件數來決定的,比如常用的kafka source topic的partition是3,那麼FlinkSQL任務的並發就是3。

又因為每個task維護單獨的watermark。雖然你在全局設置了並行度為1,認為全部數據進入一個task,在某個時刻應該觸發了計算,然而實際情況並沒有觸發計算的,那在這個時候你就要考慮kafka分區帶來的影響。

tEnv.getConfig()
  .addConfiguration(
    new Configuration()
    .set(CoreOptions.DEFAULT_PARALLELISM, 1)
    );

其他函數

Window Aggregation

Group Aggregation

Over Aggregation

可參考往期

FlinkSQL窗口,讓你眼前一亮,是否可以大吃一驚呢

「注:」 本篇案例參考以下

實時計算Flink版:

https://help.aliyun.com/document_detail/62510.html

Flink官網: 

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/

關注回復關鍵字「加群」你可以提各種技術、產品、管理、認知等問題。群中特色:每天早上安排一個話題討論,對學習和後期找工作有大幫助,討論結果整理在線文檔隨後發出。在群力群策中形成知識庫。

動動小手,讓更多需要的人看到~

相關焦點

  • 【Flink】小白級入門,Flink sql 的基礎用法
    Flink sql 是什麼❝sql 的誕生就是為了簡化我們對數據開發,可以使用少量的 sql 代碼,幫助我完成對數據的查詢,分析等功能❞聲明式 & 易於理解對於用戶只需要表達我想要什麼,具體處理邏輯交給框架,系統處理,用戶無需關心,對於一些非專業的開發人員有了解 sql,並且 sql 相對我們學習 java,c 等語言更簡單
  • 每天5分鐘Flink -WordCount及Flink SQL
    今天是 Flink 系列的第二篇。目標:通過每天一小會兒,熟悉 Flink 的方方面面,為後面算法實現提供工具基礎。包括 Streaming 和 Batch 以及 SQL 的簡單案例。上述所有的 Flink 語義都會在後面分篇章詳細贅述。基礎配置首先pom.xml 中要配置的依賴是:provided 選項在這表示此依賴只在代碼編譯的時候使用,運行和打包的時候不使用。
  • flink sql 知其所以然(一)| source\sink 原理
    感謝您的關注  +  點讚 + 再看,對博主的肯定,會督促博主持續的輸出更多的優質實戰內容!!!1.序篇-本文結構本文從以下五個小節介紹 flink sql source\sink\format 的概念、原理。
  • 一篇文章讓深入理解Flink SQL 時間特性
    前言         基於時間的操作(比如 Table API 和 SQL 中窗口操作),需要定義相關的時間語義和時間數據來源的信息。所以,Table 可以提供一個邏輯上的時間欄位,用於在表處理程序中,指示時間和訪問相應的時間戳。
  • Flink SQL 實戰:HBase 的結合應用
    在 Flink SQL 實戰系列第二篇中介紹了如何註冊 Flink Mysql table,我們可以將廣告位表抽取到 HBase 表中,用來做維度表,進行 temporal table join。因此,我們需要在 HBase 中創建一張表,同時還需要創建 Flink HBase table, 這兩張表通過 Flink SQL 的 HBase connector 關聯起來。
  • 通俗易懂的學會:SQL窗口函數
    在日常工作中,經常會遇到需要在每組內排名,比如下面的業務需求:排名問題:每個部門按業績來排名topN問題:找出每個部門排名前N的員工進行獎勵面對這類需求,就需要使用sql的高級功能窗口函數了。二.什麼是窗口函數?
  • 鞏固SQL - 窗口函數&變量&數據透視圖
    但作為合格的一個數據分析師,sql的精通肯定是必不可少的,所以最近瘋狂刷sql題,同時也來總結下我以前比較少用的語法。(工作寫的是hive,為方便演示,本文章均使用Mysql8.0.16版本)一、窗口函數1、什麼是窗口函數窗口函數,也叫OLAP函數(Online Anallytical Processing,聯機分析處理),可以對資料庫數據進行實時分析處理。
  • SQL 窗口函數是什麼?漲見識了!
    窗口函數(Window Function) 是 SQL2003 標準中定義的一項新特性,並在 SQL2011、SQL2016 中又加以完善,添加了若干處拓展。窗口函數不同於我們熟悉的普通函數和聚合函數,它為每行數據進行一次計算:輸入多行(一個窗口)、返回一個值。
  • Flink sql 之 TopN 與 StreamPhysicalRankRule (源碼解析)
    基於flink1.14的源碼做解析公司內有很多業務方都在使用我們Flink sql平臺做TopN的計算,今天同事突然問到我
  • Structured Streaming與Flink比較
    Flink作為一個很好用的實時處理框架,也支持批處理,不僅提供了API的形式,也可以寫sql文本。這篇文章主要是幫著大家對於Structured Streaming和flink的主要不同點。文章建議收藏後閱讀。1.
  • Flink窗口的開始時間是什麼?
    這篇文章會很短,但是我認為很有意義,今天晚上微醺了,隨便寫一下,如有不對望不吝指正。我還記得的在我剛學習flink的時候,B站的老師說過,Flink窗口的開始時間和結束時間和你想的不一樣。那個時候我好像記得老師說過,flink的窗口大小會根據你的時間單位來進行修正。
  • 踩坑記 | flink sql count 還有這種坑!
    ❝感謝您的「關注  +  點讚 + 再看」,對博主的肯定,會督促博主持續的輸出更多的實戰內容!!!❞1.序篇通過本文你可了解到先說下結論:在非窗口類 flink sql 任務中,會存在 retract 機制,即上遊會向下遊發送「撤回消息(做減法)」,**最新的結果消息(做加法)**兩條消息來計算結果,保證結果正確性。
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    (本篇文章由於很多連結都不能直接打開,可以在公眾號內回復 Flink 獲得更友好本篇文章閱讀體驗或者在文章後面點擊 閱讀原文)本項目結構除了《從1到100深入學習Flink》源碼學習這個系列文章,《從0到1學習Flink》的案例文章也會優先在知識星球更新,讓大家先通過一些 demo 學習 Flink,再去深入源碼學習!
  • SQL、Pandas、Spark:窗口函數的3種實現
    在分析上述需求之前,首先對窗口函數進行介紹。何為窗口函數呢?既然窗口函數這個名字源於資料庫,那麼我們就援引其在資料庫中的定義。下圖源於MySQL8.0的官方文檔,從標黃高亮的一句介紹可知:窗口函數是用與當前行有關的數據行參與計算。這個翻譯可能有些蹩腳,但若能感性理解窗口函數的話,其實反而會覺得其概括的比較傳神。
  • Python與資料庫交互——窗口函數
    窗口函數可以分為靜態窗口函數和動態窗口函數 。靜態窗口函數的窗口大小是固定的,不會因為記錄的不同而不同;動態窗口函數的窗口大小會隨著記錄的不同而變化。(Tips:計算移動平均值特別有用)窗口函數的語法為:函數 OVER([PARTITION BY 欄位名 ORDER BY 欄位名 ASC|DESC ROWS BETWEEN … AND …])●  PARTITION BY子句:指定窗口函數按照哪些欄位進行分組。分組後,窗口函數可以在每個分組中分別執行。●  ORDER BY子句:指定窗口函數按照哪些欄位進行排序。
  • Flink最難知識點再解析 | 時間/窗口/水印/遲到數據處理
    很多同學仍然理解不了。事實上這跟Flink的文檔不全有直接關係。在這個問題上官網的資料不夠,學習成本巨大。我總結了我之前發過的很多文章,同時參考了這篇文章:http://uee.me/cTWVu作者是:hlp207希望這篇文章能解答讀者在這個問題上的困惑。本文結合源碼和實例講解。
  • flink sql 知其所以然(十九):Table 與 DataStream 的轉轉轉(附源碼)
    博主會把重要的知識點的原理進行剖析,讓小夥伴萌做到深入淺出1.序篇源碼公眾號後臺回復1.13.2 table datastream獲取。3.Table 與 DataStream API 的轉換具體實現3.1.先看一個官網的簡單案例官網的案例主要是讓大家看看要做到 Table 與 DataStream API 的轉換會涉及到使用哪些接口。
  • OpenCV 實戰 | 深入理解回調函數
    這個題一點也不難,因為書上給的例程已經可以完成大部分工作,只需要自己添加幾行代碼就可以實現上述功能,但添加這幾行代碼的過程可以幫助你對滑鼠回調函數有一個清楚的理解。首先我們先看一個回調函數原理:回調函數就是一個通過函數指針調用的函數。
  • SQL必備:case when函數與窗口函數
    學會使用窗口函數將會使得一些關於排名或累加的複雜問題的求解變得非常簡便。例如牛客網SQL76:寫一個sql語句查詢各個崗位分數的中位數位置上的所有grade信息,並且按id升序排序。這裡涉及到了中位數的信息,用窗口函數會很方便。
  • 盤點Flink實戰踩過的坑
    數據傾斜導致子任務積壓業務背景一個流程中,有兩個重要子任務:一是數據遷移,將kafka實時數據落Es,二是將kafka數據做窗口聚合落hbase,兩個子任務接的是同一個Topic GroupId。上遊 Topic 的 tps 高峰達到5-6w。