Flink 流模式跑離線任務

2021-02-14 Flink菜鳥

通常的認識是:Flink 流模式跑流任務,批模式跑批任務,用流模式跑離線任務也是個有意思的事情

雖然新版 Flink 已經在 sql 上實現了一定程度的流批一體,但是 DataStream 和 DataSet API 還是相差比較大的

用 Flink 跑離線任務也是機緣巧合(也是必然,畢竟我不會 Spark)

現在的項目組經常會跑歷史數據,當然是批模式的,在用 Flink batch 被遇到各種各樣的問題困擾之後

深入思考了我們需要跑的歷史任務的特性,將流模式的的概念稍微變通一下,也可以達到離線的效果

這一切都有一個前提:Flink 任務的算子,在處理完全部數據後,就自動退出了,基於這個前提,就可以達到離線任務的效果

比如,看下面的 Mysql Source

class MysqlSourceFunction(sql: String) extends RichSourceFunction[String] {
private val logger = LoggerFactory.getLogger("MysqlSourceFunction") private var connection: java.sql.Connection = _ private var queryPS: PreparedStatement = null private var count: Long = 0
override def open(parameters: Configuration): Unit = { connection = DriverManager.getConnection(Common.MYSQL_URL, Common.MYSQL_USER, Common.MYSQL_PASS) queryPS = connection.prepareStatement(sql) }
override def run(ctx: SourceFunction.SourceContext[String]): Unit = { logger.info("star query userId")
val resultSet = queryPS.executeQuery() while (resultSet.next()) { count += 1 val userId = resultSet.getString(1) ctx.collect(userId) } queryPS.close() connection.close() logger.info("query userId finish, load {} item, source exit", count) }
override def cancel(): Unit = { }}

任務啟動後會從 source 開始執行,MysqlSourceFunction 就是 MySQL 查詢數據,
並將查詢出來的數據用流的形式,調用 ctx.collect(userId) 一條一條的發送到下遊算子,
在處理完數據後,MysqlSourceFunction 的 run 方法執行完成,Flink 會自動將 Source 標記為 「FINISH」

後續的算子也是一樣,雖然是流模式的任務,Source 完成後,後續的算子也會依次完成,
跟批模式的區別是,所以算子都是同步執行的,Source 還在繼續生產數據,Sink 也在同步的輸出之前的數據,
而批的任務必須要上一算子完成,才會開始執行下一個算子。

 

 

基於這樣的前提,開始設計我的跑批任務的流程序。

簡單介紹下離線任務要做的東西:我們主要做的事情是對用戶進行實時的檢測,並輸出一下我們任務有異常的行為數據,判斷異常需要對該用戶的歷史行為進行分析,離線任務就是對用戶的歷史行為分析的程序。

真正做的事情就是,基於每個用戶,從存儲引擎中拿出該用戶一定時間範圍的歷史數據,對歷史數據統計、分類等處理後,輸出處理的結果,用於實時檢測程序,Sink 完成後,會自動釋放資源,提交對應的輸出句柄(比如寫 HDFS 會提交文件)。

可以使用流模式來跑這樣的離線任務的前提是,離線程序是基於每個用戶的歷史行為的統計,而不是像 BI 報表分析一樣,將所有用戶的數據聚合起來再分析。

到這來就簡單了,所有這樣的離線任務都分成三部分,Source -> Process -> Sink

Source 中根據不同的需求,生產不同的數據
Process 中根據收到的不同的 userId,從存儲引擎中拖出所有該用戶的數據,所有統計、分類邏輯都在這部分完成
Sink 只負責結果的輸出

目前我處理離線數據除了需要用機器學習庫的任務,都全部用 Flink 實現,並且性能和資源消耗會比用 Spark 還小(在我不懈的努力(推薦下) 我們組已經有其他人開始嘗試用 Flink 這種方式跑離線任務了)

 分區策略選擇

用這種弄方式跑離線任務和真正的流任務在分區策略的選擇上會有所不同,通常在流模式的任務中,在最主要的處理算子都會用keyBy 算子,讓下遊的算子可以使用 Flink 的 Key State,

而由於流任務並不想批任務一樣資源使用達到配置的全部,分區數據傾斜的問題,並不一定會暴露(通常給流任務的資源會根據任務高峰時期的資源來配置),流任務需要輸入數據達到高峰,才會達到任務處理的瓶頸。

而在這種流式的離線任務中,輸入數據一直處於最高位,數據傾斜的問題就會比較明顯的感知到,其中最明顯的影響就是導致任務實際需要的時間,大於計算的理論時間。

即使使用 rebalance 這種均衡的分區策略,也會在離線任務數據處理的時間不一致上,影響任務完成的時間。

 

部分並發完成全部數據標記完成,其他算子還未完成,導致實際完成時間,大於任務啟動時,根據數據處理速度計算的大致任務耗時

 下面圖可以看出,數據傾斜導致任務執行時間超過預期的情況,數據少的並發,2小時多點就已經完成了,數據多的現象,跑了 3 個半小時,還沒有結束

 

 

 

即使在數據均衡的情況下,也會有上面的情況,只是沒有這麼極端,如下圖

 

 

 

 甚至集群的負載也會對任務有比較大的影響,之前一次集群某些集群負載很高的時候(伺服器的 CPU 達到 90%),會出現,負載高的機器上的任務會比負載低的機器出了慢很多

這些都是影響任務完成時間的因素,當然這些也是影響真正流任務的因素,不過因為流任務尖峰時間有限(而且也有削峰的策略),所以影響不太嚴重。

在流模式的離線任務中由於一直處於最高負載(source 數據生產速度達到最高,很短時間就生產全部數據),所以影響很大。

 註:Flink 還不能根據下遊算子的處理速度動態的調整上遊的數據分區策略

相關焦點

  • Structured Streaming與Flink比較
    這個可以用來構建自己的自己的集群任務管理框架。ExecutionGraph: JobManager 根據 JobGraph 生成的分布式執行圖,是調度層最核心的數據結構。從spark2.3開始,只有在輸出模式為append的流查詢才能使用join,其他輸出模式暫不支持。從spark2.3開始,在join之前不允許使用no-map-like操作。以下是不能使用的例子。7.
  • Flink保證端到端exactly-once語義(也適用於kafka)
    flink的checkpoint包含了,flink應用現在的狀態與數據輸入流的位置(對於kafka來說就是offset) checkpoint可異步的持久化到像s3或者hdfs這樣子的存儲系統裡面。如果flink應用失敗或者升級時,可以拉取checkpoint中的狀態來恢復上次失敗時的數據。
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    22、flink on yarn jobmanager的HA需要怎麼配置。還是說yarn給管理了23、有兩個數據流就行connect,其中一個是實時數據流(kafka 讀取),另一個是配置流。由於配置流是從關係型資料庫中讀取,速度較慢,導致實時數據流流入數據的時候,配置信息還未發送,這樣會導致有些實時數據讀取不到配置信息。
  • 數據說話:大數據處理引擎Spark與Flink比拼
    Flink 任務圖(來源:https://ci.apache.org/projects/flink/flink-docs-release-1.5/concepts/runtime.html)  在 DAG
  • Apache Flink 誤用之痛
    在開發的過程中,其實要認認真真的來規劃我們的切入點,首先,要從簡單的任務開始循序漸進。如果不在乎結果的正確性,可以考慮用 at-least-once 的模式配置並使用可回放的數據源。相反,如果結果的準確性十分重要,且下遊不關心重複記錄,那麼僅需設置 exactly-once 模式並使用可回放的數據源。
  • Flink寫入hive測試
    >      <groupId>org.apache.flink</groupId>      <artifactId>flink-core</artifactId>      <version>${flink.version}</version>      <!
  • Apache Beam實戰指南 | 手把手教你玩轉KafkaIO與Flink
    開發者經常要用到不同的技術、框架、API、開發語言和 SDK 來應對複雜應用的開發,這大大增加了選擇合適工具和框架的難度,開發者想要將所有的大數據組件熟練運用幾乎是一項不可能完成的任務。這個是支持的,因為批也是一種流,是一種有界的流。Beam 結合了 Flink,Flink dataset 底層也是轉換成流進行處理的。 4. Flink 流批寫程序的時候和 Beam 有什麼不同?底層是 Flink 還是 Beam?
  • 一篇文章讓深入理解Flink SQL 時間特性
    _import org.apache.flink.table.api.Tableimport org.apache.flink.table.api.scala.注意,必須在轉換的數據流中分配時間戳和 watermark。                 在將數據流轉換為表時,有兩種定義時間屬性的方法。
  • Apache Flink 1.5.5 和 1.6.2 發布,通用數據處理平臺
    Apache Flink 是一個開源的流處理框架,應用於分布式、高性能、始終可用的、準確的數據流應用程式。Apache Flink 也是高效和分布式的通用數據處理平臺。Apache Flink 聲明式的數據分析開源系統,結合了分布式 MapReduce 類平臺的高效,靈活的編程和擴展性。同時在並行資料庫發現查詢優化方案。
  • 開篇|揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?
    Flink 社區很早就設想過將批數據看作一個有界流數據,將批處理看作流計算的一個特例,從而實現流批統一,阿里巴巴的 Blink 團隊在這方面做了大量的工作,已經實現了 Table API & SQL 層的流批統一。 幸運的是,阿里巴巴已經將 Blink 開源回饋給 Flink 社區。
  • 萬字詳解Flink極其重要的Time與Window(建議收藏)
    流式:就是數據源源不斷的流進來,也就是數據沒有邊界,但是我們計算的時候必須在一個有邊界的範圍內進行,所以這裡面就有一個問題,邊界怎麼確定?無非就兩種方式,根據時間段或者數據量進行確定,根據時間段就是每隔多長時間就劃分一個邊界,根據數據量就是每來多少條數據劃分一個邊界,Flink 中就是這麼劃分邊界的,本文會詳細講解。
  • flink-1.12.0 upsert-kafka connector demo
    參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh
  • Flink最難知識點再解析 | 時間/窗口/水印/遲到數據處理
    在分布式系統中,數據流的採集通常都是有延遲的,可能是網絡原因啊,程序原因啊什麼的。所以當數據到達Flink程序中的時候,問題就來了,這些數據都要進行處理嗎?有可能其中一部分數據已經延遲了好幾個小時了,這對於實時性較強的業務場景是不能容忍的!
  • Flink流批一體在阿里雙11首次落地的背後
    今年由 Flink 團隊和數據平臺團隊共同推動的流批一體計算框架在雙 11 數據核心場景成功首秀,也得到了阿里數據中臺負責人朋新宇在業務層的認可:流批一體在技術上,實現了哪怕是多個計算處理模式,也只需要撰寫一套代碼就能兼容。在計算速度上比其他框架快1倍、查詢快4倍,給小二們搭建數據報表提升了4-10倍的速度。同時,由於"一體化"的特性,能實現實時與離線數據的完全一致。
  • Spark Streaming|Flink|Storm|Kafka Streams如何選擇流處理框架
    什麼是流/流處理:流處理的最優雅的定義是:一種數據處理引擎,其設計時考慮了無限的數據集。與批處理不同,批處理以工作中的開始和結束為界,而工作是在處理有限數據之後完成的,而流處理則是指連續不斷地處理天,月,年和永久到來的無邊界數據。
  • Flink SQL 實戰:HBase 的結合應用
    ;CREATE TABLE flink_rtdw.demo.hbase_dim_table ( rowkey STRING, cf ROW < adspace_name STRING >, PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4','table-name' = 'dim_hbase','sink.buffer-flush.max-rows
  • Flink 端到端 Exactly-once 機制剖析
    但是,flink有很多外接系統,比如將數據寫到kafka,一旦作業失敗重啟,offset重置,會消費舊數據,從而將重複的結果寫到kafka。如下圖:      這個時候,僅靠系統本身是無法保證exactly-once的。系統之間的數據一致性一般要靠2PC協議來保證,flink的TwoPhaseCommitSinkFunction也是基於此實現的。
  • 解析| 深入了解Apache Flink的網絡協議棧
    Flink 的網絡協議棧是組成 flink-runtime 模塊的核心組件之一,是每個 Flink 作業的核心。它連接所有 TaskManager 的各個子任務(Subtask),因此,對於 Flink 作業的性能包括吞吐與延遲都至關重要。
  • Spark Streaming 和 Flink 誰是數據開發者的最愛?
    Flink 運行時的角色(standalone 模式)主要有:Jobmanager:協調分布式執行,他們調度任務、協調 checkpoints、協調故障恢復等。至少有一個 JobManager。以上兩種模型編程機構近似,只是在 API 和內部數據獲取有些區別,新版本的已經取消了基於 receiver 這種模式,企業中通常採用基於 direct Dstream 的模式。
  • Flink CookBook|水位生成機制
    水位本質上是個時間戳,在flink中,水位是單調遞增的,表示數據處理的進度,當watermark >=window_end_time,即水位線大於window_end_time 時才會真正觸發窗口計算。實際上會有多種觸發機制,窗口相關的內容會在後續文章裡細聊。