萬字詳解Flink極其重要的Time與Window(建議收藏)

2021-02-14 數據社
前言

Flink 是流式的、實時的 計算引擎

上面一句話就有兩個概念,一個是流式,一個是實時。

流式:就是數據源源不斷的流進來,也就是數據沒有邊界,但是我們計算的時候必須在一個有邊界的範圍內進行,所以這裡面就有一個問題,邊界怎麼確定?無非就兩種方式,根據時間段或者數據量進行確定,根據時間段就是每隔多長時間就劃分一個邊界,根據數據量就是每來多少條數據劃分一個邊界,Flink 中就是這麼劃分邊界的,本文會詳細講解。

實時:就是數據發送過來之後立馬就進行相關的計算,然後將結果輸出。這裡的計算有兩種:

本篇文章所講的 Flink 的內容就是圍繞以上概念進行詳細剖析的!

Time與WindowTime

在Flink中,如果以時間段劃分邊界的話,那麼時間就是一個極其重要的欄位。

Flink中的時間有三種類型,如下圖所示:

Event Time:是事件創建的時間。它通常由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。

Ingestion Time:是數據進入Flink的時間。

Processing Time:是每一個執行基於時間操作的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。

例如,一條日誌進入Flink的時間為2021-01-22 10:00:00.123,到達Window的系統時間為2021-01-22 10:00:01.234,日誌的內容如下:  
2021-01-06 18:37:15.624 INFO Fail over to rm2

對於業務來說,要統計1min內的故障日誌個數,哪個時間是最有意義的?—— eventTime,因為我們要根據日誌的生成時間進行統計。

Window

Window,即窗口,我們前面一直提到的邊界就是這裡的Window(窗口)。

官方解釋:流式計算是一種被設計用於處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增長的本質上無限的數據集,而window是一種切割無限數據為有限塊進行處理的手段

所以Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的」buckets」桶,我們可以在這些桶上做計算操作

Window類型

本文剛開始提到,劃分窗口就兩種方式:

根據時間進行截取(time-driven-window),比如每1分鐘統計一次或每10分鐘統計一次。

根據數據進行截取(data-driven-window),比如每5個數據統計一次或每50個數據統計一次。

窗口類型

對於TimeWindow(根據時間劃分窗口), 可以根據窗口實現原理的不同分成三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)

滾動窗口(Tumbling Windows)

將數據依據固定的窗口長度對數據進行切片。

特點:時間對齊,窗口長度固定,沒有重疊

滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,並且不會出現重疊。

例如:如果你指定了一個5分鐘大小的滾動窗口,窗口的創建如下圖所示:

滾動窗口

適用場景:適合做BI統計等(做每個時間段的聚合計算)。

滑動窗口(Sliding Windows)

滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。

特點:時間對齊,窗口長度固定,有重疊

滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數來配置,另一個窗口滑動參數控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數小於窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。

例如,你有10分鐘的窗口和5分鐘的滑動,那麼每個窗口中5分鐘的窗口裡包含著上個10分鐘產生的數據,如下圖所示:

滑動窗口

適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。

會話窗口(Session Windows)

由一系列事件組合一個指定時間長度的timeout間隙組成,類似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。

特點:時間無對齊

session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度,當這個非活躍周期產生,那麼當前的session將關閉並且後續的元素將被分配到新的session窗口中去。

會話窗口Window APITimeWindow

TimeWindow是將指定時間範圍內的所有數據組成一個window,一次對一個window裡面的所有數據進行計算(就是本文開頭說的對一個邊界內的數據進行計算)。

我們以 紅綠燈路口通過的汽車數量 為例子:

紅綠燈路口會有汽車通過,一共會有多少汽車通過,無法計算。因為車流源源不斷,計算沒有邊界。

所以我們統計每15秒鐘通過紅路燈的汽車數量,如第一個15秒為2輛,第二個15秒為3輛,第三個15秒為1輛 …

我們使用 Linux 中的 nc 命令模擬數據的發送方

11.開啟發送埠,埠號為9999
2nc -lk 9999
3
42.發送內容(key 代表不同的路口,value 代表每次通過的車輛)
5一次發送一行,發送的時間間隔代表汽車經過的時間間隔
69,3
79,2
89,7
94,9
102,6
111,5
122,3
135,7
145,4

Flink 進行採集數據並計算:

1object Window {
2  def main(args: Array[String]): Unit = {
3    //TODO time-window
4    //1.創建運行環境
5    val env = StreamExecutionEnvironment.getExecutionEnvironment
6
7    //2.定義數據流來源
8    val text = env.socketTextStream("localhost", 9999)
9
10    //3.轉換數據格式,text->CarWc
11    case class CarWc(sensorId: Int, carCnt: Int)
12    val ds1: DataStream[CarWc] = text.map {
13      line => {
14        val tokens = line.split(",")
15        CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
16      }
17    }
18
19    //4.執行統計操作,每個sensorId一個tumbling窗口,窗口的大小為5秒
20    //也就是說,每5秒鐘統計一次,在這過去的5秒鐘內,各個路口通過紅綠燈汽車的數量。
21    val ds2: DataStream[CarWc] = ds1
22      .keyBy("sensorId")
23      .timeWindow(Time.seconds(5))
24      .sum("carCnt")
25
26    //5.顯示統計結果
27    ds2.print()
28
29    //6.觸發流計算
30    env.execute(this.getClass.getName)
31
32  }
33}

我們發送的數據並沒有指定時間欄位,所以Flink使用的是默認的 Processing Time,也就是Flink系統處理數據時的時間。

1//1.創建運行環境
2val env = StreamExecutionEnvironment.getExecutionEnvironment
3
4//2.定義數據流來源
5val text = env.socketTextStream("localhost", 9999)
6
7//3.轉換數據格式,text->CarWc
8case class CarWc(sensorId: Int, carCnt: Int)
9
10val ds1: DataStream[CarWc] = text.map {
11  line => {
12    val tokens = line.split(",")
13    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
14  }
15}
16//4.執行統計操作,每個sensorId一個sliding窗口,窗口時間10秒,滑動時間5秒
17//也就是說,每5秒鐘統計一次,在這過去的10秒鐘內,各個路口通過紅綠燈汽車的數量。
18val ds2: DataStream[CarWc] = ds1
19  .keyBy("sensorId")
20  .timeWindow(Time.seconds(10), Time.seconds(5))
21  .sum("carCnt")
22
23//5.顯示統計結果
24ds2.print()
25
26//6.觸發流計算
27env.execute(this.getClass.getName)

CountWindow

CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。

注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的所有元素的總數

1//1.創建運行環境
2val env = StreamExecutionEnvironment.getExecutionEnvironment
3
4//2.定義數據流來源
5val text = env.socketTextStream("localhost", 9999)
6
7//3.轉換數據格式,text->CarWc
8case class CarWc(sensorId: Int, carCnt: Int)
9
10val ds1: DataStream[CarWc] = text.map {
11  (f) => {
12    val tokens = f.split(",")
13    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
14  }
15}
16//4.執行統計操作,每個sensorId一個tumbling窗口,窗口的大小為5
17//按照key進行收集,對應的key出現的次數達到5次作為一個結果
18val ds2: DataStream[CarWc] = ds1
19  .keyBy("sensorId")
20  .countWindow(5)
21  .sum("carCnt")
22
23//5.顯示統計結果
24ds2.print()
25
26//6.觸發流計算
27env.execute(this.getClass.getName)

同樣也是窗口長度和滑動窗口的操作:窗口長度是5,滑動長度是3

1//1.創建運行環境
2val env = StreamExecutionEnvironment.getExecutionEnvironment
3
4//2.定義數據流來源
5val text = env.socketTextStream("localhost", 9999)
6
7//3.轉換數據格式,text->CarWc
8case class CarWc(sensorId: Int, carCnt: Int)
9
10val ds1: DataStream[CarWc] = text.map {
11  (f) => {
12    val tokens = f.split(",")
13    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
14  }
15}
16//4.執行統計操作,每個sensorId一個sliding窗口,窗口大小3條數據,窗口滑動為3條數據
17//也就是說,每個路口分別統計,收到關於它的3條消息時統計在最近5條消息中,各自路口通過的汽車數量
18val ds2: DataStream[CarWc] = ds1
19  .keyBy("sensorId")
20  .countWindow(5, 3)
21  .sum("carCnt")
22
23//5.顯示統計結果
24ds2.print()
25
26//6.觸發流計算
27env.execute(this.getClass.getName)

flink支持兩種劃分窗口的方式(time和count)

flink支持窗口的兩個重要屬性(size和interval)

如果size=interval,那麼就會形成tumbling-window(無重疊數據)

如果size>interval,那麼就會形成sliding-window(有重疊數據)

如果size<interval,那麼這種窗口將會丟失數據。比如每5秒鐘,統計過去3秒的通過路口汽車的數據,將會漏掉2秒鐘的數據。

通過組合可以得出四種基本窗口

time-tumbling-window 無重疊數據的時間窗口,設置方式舉例:timeWindow(Time.seconds(5))

time-sliding-window  有重疊數據的時間窗口,設置方式舉例:timeWindow(Time.seconds(5), Time.seconds(3))

count-tumbling-window無重疊數據的數量窗口,設置方式舉例:countWindow(5)

count-sliding-window 有重疊數據的數量窗口,設置方式舉例:countWindow(5,3)

Window Reduce

WindowedStream → DataStream:給window賦一個reduce功能的函數,並返回一個聚合的結果。

1import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
2import org.apache.flink.api.scala._
3import org.apache.flink.streaming.api.windowing.time.Time
4
5object StreamWindowReduce {
6  def main(args: Array[String]): Unit = {
7    // 獲取執行環境
8    val env = StreamExecutionEnvironment.getExecutionEnvironment
9
10    // 創建SocketSource
11    val stream = env.socketTextStream("node01", 9999)
12
13    // 對stream進行處理並按key聚合
14    val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
15
16    // 引入時間窗口
17    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
18
19    // 執行聚合操作
20    val streamReduce = streamWindow.reduce(
21      (item1, item2) => (item1._1, item1._2 + item2._2)
22    )
23
24    // 將聚合數據寫入文件
25    streamReduce.print()
26
27    // 執行程序
28    env.execute("TumblingWindow")
29  }
30}

Window Apply

apply方法可以進行一些自定義處理,通過匿名內部類的方法來實現。當有一些複雜計算時使用。

用法

實現一個 WindowFunction 類

指定該類的泛型為 [輸入數據類型, 輸出數據類型, keyBy中使用分組欄位的類型, 窗口類型]

示例:使用apply方法來實現單詞統計

步驟:

獲取流處理運行環境

構建socket流數據源,並指定IP位址和埠號

對接收到的數據轉換成單詞元組

使用 keyBy 進行分流(分組)

使用 timeWinodw 指定窗口的長度(每3秒計算一次)

實現一個WindowFunction匿名內部類

apply方法中實現聚合計算

使用Collector.collect收集數據

核心代碼如下:

1    //1. 獲取流處理運行環境
2    val env = StreamExecutionEnvironment.getExecutionEnvironment
3
4    //2. 構建socket流數據源,並指定IP位址和埠號
5    val textDataStream = env.socketTextStream("node01", 9999).flatMap(_.split(" "))
6
7    //3. 對接收到的數據轉換成單詞元組
8    val wordDataStream = textDataStream.map(_->1)
9
10    //4. 使用 keyBy 進行分流(分組)
11    val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1)
12
13    //5. 使用 timeWinodw 指定窗口的長度(每3秒計算一次)
14    val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))
15
16    //6. 實現一個WindowFunction匿名內部類
17    val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
18      //在apply方法中實現數據的聚合
19      override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
20        println("hello world")
21        val tuple = input.reduce((t1, t2) => {
22          (t1._1, t1._2 + t2._2)
23        })
24        //將要返回的數據收集起來,發送回去
25        out.collect(tuple)
26      }
27    })
28    reduceDatStream.print()
29    env.execute()

Window Fold

WindowedStream → DataStream:給窗口賦一個fold功能的函數,並返回一個fold後的結果。

1import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
2import org.apache.flink.api.scala._
3import org.apache.flink.streaming.api.windowing.time.Time
4
5object StreamWindowFold {
6  def main(args: Array[String]): Unit = {
7    // 獲取執行環境
8    val env = StreamExecutionEnvironment.getExecutionEnvironment
9
10    // 創建SocketSource
11    val stream = env.socketTextStream("node01", 9999,'\n',3)
12
13    // 對stream進行處理並按key聚合
14    val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
15
16    // 引入滾動窗口
17    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
18
19    // 執行fold操作
20    val streamFold = streamWindow.fold(100){
21      (begin, item) =>
22        begin + item._2
23    }
24
25    // 將聚合數據寫入文件
26    streamFold.print()
27
28    // 執行程序
29    env.execute("TumblingWindow")
30  }
31}

Aggregation on Window

WindowedStream → DataStream:對一個window內的所有元素做聚合操作。min和 minBy的區別是min返回的是最小值,而minBy返回的是包含最小值欄位的元素(同樣的原理適用於 max 和 maxBy)。

1import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
2import org.apache.flink.streaming.api.windowing.time.Time
3import org.apache.flink.api.scala._
4
5object StreamWindowAggregation {
6  def main(args: Array[String]): Unit = {
7    // 獲取執行環境
8    val env = StreamExecutionEnvironment.getExecutionEnvironment
9
10    // 創建SocketSource
11    val stream = env.socketTextStream("node01", 9999)
12
13    // 對stream進行處理並按key聚合
14    val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)
15
16    // 引入滾動窗口
17    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
18
19    // 執行聚合操作
20    val streamMax = streamWindow.max(1)
21
22    // 將聚合數據寫入文件
23    streamMax.print()
24
25    // 執行程序
26    env.execute("TumblingWindow")
27  }
28}

EventTime與WindowEventTime的引入

與現實世界中的時間是不一致的,在flink中被劃分為事件時間,提取時間,處理時間三種。

如果以EventTime為基準來定義時間窗口那將形成EventTimeWindow,要求消息本身就應該攜帶EventTime

如果以IngesingtTime為基準來定義時間窗口那將形成IngestingTimeWindow,以source的systemTime為準。

如果以ProcessingTime基準來定義時間窗口那將形成ProcessingTimeWindow,以operator的systemTime為準。

在Flink的流式處理中,絕大部分的業務都會使用eventTime,一般只在eventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那麼需要引入EventTime的時間屬性,引入方式如下所示:

1val env = StreamExecutionEnvironment.getExecutionEnvironment
2
3// 從調用時刻開始給env創建的每一個stream追加時間特徵
4env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Watermark引入

我們知道,流處理從事件產生,到流經 source,再到 operator,中間是有一個過程和時間的,雖然大部分情況下,流到 operator 的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡、背壓等原因,導致亂序的產生,所謂亂序,就是指 Flink 接收到的事件的先後順序不是嚴格按照事件的 Event Time 順序排列的,所以 Flink 最初設計的時候,就考慮到了網絡延遲,網絡亂序等問題,所以提出了一個抽象概念:水印(WaterMark);

如上圖所示,就出現一個問題,一旦出現亂序,如果只根據 EventTime 決定 Window 的運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間後,必須觸發 Window 去進行計算了,這個特別的機制,就是 Watermark。

Watermark 是用於處理亂序事件的,而正確的處理亂序事件,通常用 Watermark 機制結合 Window 來實現

數據流中的 Watermark 用於表示 timestamp 小於 Watermark 的數據,都已經到達了,因此,Window 的執行也是由 Watermark 觸發的。

Watermark 可以理解成一個延遲觸發機制,我們可以設置 Watermark 的延時時長 t,每次系統會校驗已經到達的數據中最大的 maxEventTime,然後認定 EventTime 小於 maxEventTime - t 的所有數據都已經到達,如果有窗口的停止時間等於 maxEventTime – t,那麼這個窗口被觸發執行

有序流的Watermarker如下圖所示:(Watermark設置為0)

有序數據的Watermark

亂序流的Watermarker如下圖所示:(Watermark設置為2)

無序數據的Watermark

當 Flink 接收到每一條數據時,都會產生一條 Watermark,這條 Watermark 就等於當前所有到達數據中的 maxEventTime - 延遲時長,也就是說,Watermark 是由數據攜帶的,一旦數據攜帶的 Watermark 比當前未觸發的窗口的停止時間要晚,那麼就會觸發相應窗口的執行。由於 Watermark 是由數據攜帶的,因此,如果運行過程中無法獲取新的數據,那麼沒有被觸發的窗口將永遠都不被觸發

上圖中,我們設置的允許最大延遲到達時間為2s,所以時間戳為7s的事件對應的Watermark是5s,時間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那麼時間戳為7s的事件到達時的Watermarker恰好觸發窗口1,時間戳為12s的事件到達時的Watermark恰好觸發窗口2。

Flink對於遲到數據的處理

waterMark和Window機制解決了流式數據的亂序問題,對於因為延遲而順序有誤的數據,可以根據eventTime進行業務處理,於延遲的數據Flink也有自己的解決辦法,主要的辦法是給定一個允許延遲的時間,在該時間範圍內仍可以接受處理延遲數據。

設置允許延遲的時間是通過 allowedLateness(lateness: Time) 設置

保存延遲數據則是通過 sideOutputLateData(outputTag: OutputTag[T]) 保存

獲取延遲數據是通過 DataStream.getSideOutput(tag: OutputTag[X]) 獲取

具體的用法如下:

allowedLateness(lateness: Time)

1def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
2  javaStream.allowedLateness(lateness)
3  this
4}

該方法傳入一個Time值,設置允許數據遲到的時間,這個時間和 WaterMark 中的時間概念不同。再來回顧一下:

WaterMark=數據的事件時間-允許亂序時間值

隨著新數據的到來,waterMark的值會更新為最新數據事件時間-允許亂序時間值,但是如果這時候來了一條歷史數據,waterMark值則不會更新。總的來說,waterMark是為了能接收到儘可能多的亂序數據。

那這裡的Time值,主要是為了等待遲到的數據,在一定時間範圍內,如果屬於該窗口的數據到來,仍會進行計算,後面會對計算方式仔細說明

注意:該方法只針對於基於event-time的窗口,如果是基於processing-time,並且指定了非零的time值則會拋出異常。

sideOutputLateData(outputTag: OutputTag[T])

1def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
2  javaStream.sideOutputLateData(outputTag)
3  this
4}

該方法是將遲來的數據保存至給定的outputTag參數,而OutputTag則是用來標記延遲數據的一個對象。

DataStream.getSideOutput(tag: OutputTag[X])

通過window等操作返回的DataStream調用該方法,傳入標記延遲數據的對象來獲取延遲的數據。

對延遲數據的理解

延遲數據是指:

在當前窗口【假設窗口範圍為10-15】已經計算之後,又來了一個屬於該窗口的數據【假設事件時間為13】,這時候仍會觸發 Window 操作,這種數據就稱為延遲數據。

那麼問題來了,延遲時間怎麼計算呢?

假設窗口範圍為10-15,延遲時間為2s,則只要 WaterMark<15+2,並且屬於該窗口,就能觸發 Window 操作。而如果來了一條數據使得 WaterMark>=15+2,10-15這個窗口就不能再觸發 Window 操作,即使新來的數據的 Event Time 屬於這個窗口時間內 。

Flink 關聯 Hive 分區表

Flink 1.12 支持了 Hive 最新的分區作為時態表的功能,可以通過 SQL 的方式直接關聯 Hive 分區表的最新分區,並且會自動監聽最新的 Hive 分區,當監控到新的分區後,會自動地做維表數據的全量替換。通過這種方式,用戶無需編寫 DataStream 程序即可完成 Kafka 流實時關聯最新的 Hive 分區實現數據打寬

具體用法:

在 Sql Client 中註冊 HiveCatalog:

1vim conf/sql-client-defaults.yaml 
2catalogs: 
3  - name: hive_catalog 
4    type: hive 
5    hive-conf-dir: /disk0/soft/hive-conf/ #該目錄需要包hive-site.xml文件 

創建 Kafka 表

1CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (  
2    master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>,  
3proctime as PROCTIME()  -- PROCTIME用來和Hive時態表關聯  
4) WITH (  
5 'connector' = 'kafka',  
6 'topic' = 'topic_name',  
7 'format' = 'json',  
8 'properties.bootstrap.servers' = 'host:9092',  
9 'properties.group.id' = 'flinkTestGroup',  
10 'scan.startup.mode' = 'timestamp',  
11 'scan.startup.timestamp-millis' = '1607844694000'  
12); 

Flink 事實表與 Hive 最新分區數據關聯

dim_extend_shop_info 是 Hive 中已存在的表,所以我們用 table hint 動態地開啟維表參數。

1CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  
2SELECT * FROM  
3 (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,   
4     ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn  
5    from hive_catalog.flink_db.kfk_fact_bill_master_12 t1  
6       JOIN hive_catalog.flink_db.dim_extend_shop_info   
7  /*+ OPTIONS('streaming-source.enable'='true',  
8     'streaming-source.partition.include' = 'latest',  
9     'streaming-source.monitor-interval' = '1 h',
10     'streaming-source.partition-order' = 'partition-name') */
11    FOR SYSTEM_TIME AS OF t1.proctime AS t2 --時態表  
12    ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id  
13    where groupID in (202042)) t  where t.rn = 1 

參數解釋:

latest 屬性: 只讀取最新分區數據。

all: 讀取全量分區數據 ,默認值為 all,表示讀所有分區,latest 只能用在 temporal join 中,用於讀取最新分區作為維表,不能直接讀取最新分區數據。

streaming-source.monitor-interval 監聽新分區生成的時間、不宜過短 、最短是1 個小時,因為目前的實現是每個 task 都會查詢 metastore,高頻的查可能會對metastore 產生過大的壓力。需要注意的是,1.12.1 放開了這個限制,但仍建議按照實際業務不要配個太短的 interval。

streaming-source.partition-order 分區策略,主要有以下 3 種,其中最為推薦的是 partition-name

partition-name 使用默認分區名稱順序加載最新分區

create-time 使用分區文件創建時間順序

partition-time 使用分區時間順序

相關焦點

  • Flink Window的5個使用小技巧
    在寫外部系統的時候,我們有如下兩種方式:每條消息都發送一次;這種方式延遲較低,但是吞吐也比較低;「積攢」一部分消息,以Batch發送;這種方式延遲增大,但是吞吐提高;在實際生產中,除非對延遲要求非常高,否則使用第一種方式會給外部存儲系統帶來很大的QPS壓力,所以一般建議採用第二種方式。
  • Flink最難知識點再解析 | 時間/窗口/水印/遲到數據處理
    數據在源源不斷的進入flink,我們設置好window的大小為5s,flink會以5s來將每分鐘劃分為連續的多個窗口。進入flink的第一條數據會落在一個時間窗口內,假設數據的事件時間為13s(小時和分不重要,因為窗口大小的度量單位是秒),則落入的窗口是【10-15】。
  • Structured Streaming與Flink比較
    這篇文章主要是幫著大家對於Structured Streaming和flink的主要不同點。文章建議收藏後閱讀。1. 運行模型Structured Streaming 的task運行也是依賴driver 和 executor,當然driver和excutor也還依賴於集群管理器Standalone或者yarn等。
  • Flink CookBook|水位生成機制
    window區間內可以認為有兩個基本的概念window_start_time和window_end_time,顧名思義,window_start_time代表的是區間的起始時間,而window_end_time代表的是window時間區間的結束時間,根據元素時間戳就可以將元素放到合適的窗口。時間戳可以解決元素歸類到窗口的問題,水位解決第二個問題。
  • Flink 端到端 Exactly-once 機制剖析
    但是,flink有很多外接系統,比如將數據寫到kafka,一旦作業失敗重啟,offset重置,會消費舊數據,從而將重複的結果寫到kafka。如下圖:      這個時候,僅靠系統本身是無法保證exactly-once的。系統之間的數據一致性一般要靠2PC協議來保證,flink的TwoPhaseCommitSinkFunction也是基於此實現的。
  • Flink寫入hive測試
    >      <groupId>org.apache.flink</groupId>      <artifactId>flink-core</artifactId>      <version>${flink.version}</version>      <!
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    11、flink 啟動時不自動創建 上傳jar的路徑,能指定一個創建好的目錄嗎12、Flink sink to es 集群上報 slot 不夠,單機跑是好的,為什麼?13、Fllink to elasticsearch如何創建索引文檔期時間戳?14、blink有沒有api文檔或者demo,是否建議blink用於生產環境。
  • 一篇文章讓深入理解Flink SQL 時間特性
    在定義 Schema 期間,可以111用.proctime,定義處理時間欄位。          注意,這個 proctime 屬性只能通過附加邏輯欄位,來擴展物理 schema。因此,只能在 schema 定義的末尾定義它。
  • 實戰|Kafka + Flink + Redis 的電商大屏實時計算案
    實時大屏(real-time dashboard)正在被越來越多的企業採用,用來及時呈現關鍵的數據指標。並且在實際操作中,肯定也不會僅僅計算一兩個維度。由於Flink的「真·流式計算」這一特點,它比Spark Streaming要更適合大屏應用。本文從筆者的實際工作經驗抽象出簡單的模型,並簡要敘述計算流程(當然大部分都是源碼)。
  • Apache Flink 1.5.5 和 1.6.2 發布,通用數據處理平臺
    1.5.5 是 Apache Flink 1.5 系列的第五個 bugfix 版本,包括超過 20 個修復程序和一些小改進,強烈建議所有用戶升級到 Flink 1.5.5,主要改進如下:改進[FLINK-10075] - HTTP connections to a secured REST endpoint
  • flink-1.12.0 upsert-kafka connector demo
    參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh
  • 初中物理:知識點總結大全詳解,真心實用,建議收藏!
    中考每一分都很重要,特別是物理科目,題目比較難,每一題的分值又很高,經常會出現錯個幾道題,分數直接掉個十幾二十分,很是讓人心痛。而物理的那些考試題目又不是想做就能做出來的,越到後面的題越複雜,想多拿一分更不容易。因此,不少同學對物理這門學科是又恨又怕,但又不得不認真對待。
  • Flink保證端到端exactly-once語義(也適用於kafka)
    接下來,我們進一步介紹flink的這個特性:    •flink的checkpoints在保證exactly-once語義是的作用    •flink是如何通過兩部提交協議來保證從數據源到數據輸出的exactly-once語義    •通過一個例子來解釋如果應用TwoPhaseCommitSinkFunction來實現一個exactly-once的sinkexactly-once
  • Flink 窗口之Window機制
    但是,部分求和流可能不是我們想要的,因為它會不斷更新計數,更重要的是,某些信息(例如隨時間變化)會丟失。因此,我們需要想改一下我們的問題:每分鐘通過該位置的汽車數量。如上所述,在數據流上定義窗口是非並行操作。
  • Apache Flink 誤用之痛
    郵件列表:user@flink.apache.com/user-zh@flink.apache.orgStack Overflow:www.stackoverflow.com2.相反,如果結果的準確性十分重要,且下遊不關心重複記錄,那麼僅需設置 exactly-once 模式並使用可回放的數據源。如果下遊要求數據不能重複,哪怕數據正確也只能發送一次,這種時候就對 sink 有更進一步的限制,在 exactly-once 的模式下,使用可回放的數據源,並且 sink 需要支持事務。
  • 5本超1000萬字的已完結長篇玄幻小說,劇情經典,建議收藏慢慢看
    哈嘍,大家好,這裡是阿昆網絡,今天給大家推薦5本超1000萬字的已完結長篇玄幻小說,劇情經典,建議收藏慢慢看,看的好的話,記得收藏關注哦!那個聲音顯得極其不耐煩,或者說是不情願,淡然道:「這麼說很麻煩,你給我進來!」「進……進來?」白晨還沒明白怎麼回事,突然感覺天旋地轉,整個人突然失去意識般,癱倒在地上。眼前飛花霧繞,轉眼間便化作清明,一座金光閃爍的高塔豎立於前。
  • Hospital Window(醫院的窗口)
    His bed was next to the room's only window. The other man had to spend all his time flat on his back. The men talked for hours on end.
  • Light in the Window
    As a freshman she was more than ready to go home for the first time since August.Never one to ignore her mother's advice, Ellie dug up the candles, lit them, said the blessings, placed the menorah on her window sill and spent the rest of the evening in her room studying.
  • Apache Beam實戰指南 | 手把手教你玩轉KafkaIO與Flink
    如果想使用 KafkaIO,必須依賴 beam-sdks-java-io-kafka ,KafkaIO 同時支持多個版本的 Kafka 客戶端,使用時建議用高版本的或最新的 Kafka 版本,因為使用 KafkaIO 的時候需要包含 kafka-clients 的依賴版本。
  • 男性購物時間 shopping window
    喜歡shopping的人大概都知道window shopping吧?對,就是「櫥窗購物」,也就是只看不買。愛逛街的女孩子們肯定都會說這個詞。不過,今天我們要說的是主要針對男同胞的,似乎他們平時不怎麼愛逛街,就算到了商場也是買了東西就走人。他們購物所花的時間就叫做shopping window。