開篇|揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?

2021-01-11 阿里云云棲號

本文為 Apache Flink 新版本重大功能特性解讀之 Flink SQL 系列文章的開篇,Flink SQL 系列文章由其核心貢獻者們分享,涵蓋基礎知識、實踐、調優、內部實現等各個方面,帶你由淺入深地全面了解 Flink SQL。

1. 發展歷程

今年的8月22日 Apache Flink 發布了1.9.0 版本(下文簡稱1.9),在 Flink 1.9 中,Table 模塊迎來了核心架構的升級,引入了阿里巴巴Blink團隊貢獻的諸多功能,本文對Table 模塊的架構進行梳理並介紹如何使用 Blink Planner。

Flink 的 Table 模塊 包括 Table API 和 SQL,Table API 是一種類SQL的API,通過Table API,用戶可以像操作表一樣操作數據,非常直觀和方便;SQL作為一種聲明式語言,有著標準的語法和規範,用戶可以不用關心底層實現即可進行數據的處理,非常易於上手,Flink Table API 和 SQL 的實現上有80%左右的代碼是公用的。作為一個流批統一的計算引擎,Flink 的 Runtime 層是統一的,但在 Flink 1.9 之前,Flink API 層 一直分為DataStream API 和 DataSet API, Table API & SQL 位於 DataStream API 和 DataSet API 之上。

Flink 1.8 Table 架構

在 Flink 1.8 架構裡,如果用戶需要同時流計算、批處理的場景下,用戶需要維護兩套業務代碼,開發人員也要維護兩套技術棧,非常不方便。 Flink 社區很早就設想過將批數據看作一個有界流數據,將批處理看作流計算的一個特例,從而實現流批統一,阿里巴巴的 Blink 團隊在這方面做了大量的工作,已經實現了 Table API & SQL 層的流批統一。 幸運的是,阿里巴巴已經將 Blink 開源回饋給 Flink 社區。為了實現 Flink 整個體系的流批統一,在結合 Blink 團隊的一些先行經驗的基礎上,Flink 社區的開發人員在多輪討論後,基本敲定了Flink 未來的技術架構。

Flink 未來架構

在Flink 的未來架構中,DataSet API將被廢除,面向用戶的API只有 DataStream API 和 Table API & SQL,在實現層,這兩個API共享相同的技術棧,使用統一的 DAG 數據結構來描述作業,使用統一的 StreamOperator 來編寫算子邏輯,以及使用統一的流式分布式執行引擎,實現徹底的流批統一。 這兩個API都提供流計算和批處理的功能,DataStream API 提供了更底層和更靈活的編程接口,用戶可以自行描述和編排算子,引擎不會做過多的幹涉和優化;Table API & SQL 則提供了直觀的Table API、標準的SQL支持,引擎會根據用戶的意圖來進行優化,並選擇最優的執行計劃。

2.Flink 1.9 Table 架構

Blink 的 Table 模塊的架構在開源時就已經實現了流批統一,向著 Flink 的未來架構邁進了第一步,走在了 Flink 社區前面。 因此在 Flink 1.9 合入 Blink Table 代碼時,為了保證 Flink Table 已有架構和 Blink Table的架構能夠並存並朝著 Flink 未來架構演進,社區的開發人員圍繞FLIP-32(FLIP 即 Flink Improvement Proposals,專門記錄一些對Flink 做較大修改的提議。FLIP-32是:Restructure flink-table for future contributions) 進行了重構和優化,從而使得 Flink Table 的新架構具備了流批統一的能力,可以說 Flink 1.9 是 Flink 向著流批徹底統一這個未來架構邁出的第一步。

Flink 1.9 Table 架構

在 Flink Table 的新架構中,有兩個查詢處理器:Flink Query Processor 和 Blink Query Processor,分別對應兩個Planner,我們稱之為 Old Planner 和 Blink Planner。查詢處理器是 Planner 的具體實現, 通過parser(解析器)、optimizer(優化器)、codegen(代碼生成技術)等流程將 Table API & SQL作業轉換成 Flink Runtime 可識別的 Transformation DAG (由Transformation組成的有向無環圖,表示作業的轉換邏輯),最終由 Flink Runtime 進行作業的調度和執行。

Flink 的查詢處理器針對流計算和批處理作業有不同的分支處理,流計算作業底層的 API 是 DataStream API, 批處理作業底層的 API 是 DataSet API;而 Blink 的查詢處理器則實現流批作業接口的統一,底層的 API 都是Transformation。

3.Flink Planner 與 Blink Planner

Flink Table 的新架構實現了查詢處理器的插件化,社區完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,用戶可以自行選擇使用 Old Planner 還是 Blink Planner。

在模型上,Old Planner 沒有考慮流計算作業和批處理作業的統一,針對流計算作業和批處理作業的實現不盡相同,在底層會分別翻譯到 DataStream API 和 DataSet API 上。而 Blink Planner 將批數據集看作 bounded DataStream (有界流式數據) ,流計算作業和批處理作業最終都會翻譯到 Transformation API 上。 在架構上,Blink Planner 針對批處理和流計算,分別實現了BatchPlanner 和 StreamPlanner ,兩者共用了大部分代碼,共享了很多優化邏輯。 Old Planner 針對批處理和流計算的代碼實現的是完全獨立的兩套體系,基本沒有實現代碼和優化邏輯復用。

除了模型和架構上的優點外,Blink Planner 在阿里巴巴集團內部的海量業務場景下沉澱了許多實用功能,集中在三個方面:

Blink Planner 對代碼生成機製做了改進、對部分算子進行了優化,提供了豐富實用的新功能,如維表 join、Top N、MiniBatch、流式去重、聚合場景的數據傾斜優化等新功能。Blink Planner 的優化策略是基於公共子圖的優化算法,包含了基於成本的優化(CBO)和基於規則的優化(CRO)兩種策略,優化更為全面。同時,Blink Planner 支持從 catalog 中獲取數據源的統計信息,這對CBO優化非常重要。Blink Planner 提供了更多的內置函數,更標準的 SQL 支持,在 Flink 1.9 版本中已經完整支持 TPC-H ,對高階的 TPC-DS 支持也計劃在下一個版本實現。整體看來,Blink 查詢處理器在架構上更為先進,功能上也更為完善。出於穩定性的考慮,Flink 1.9 默認依然使用 Flink Planner,用戶如果需要使用 Blink Planner,可以作業中顯式指定。

4.如何啟用 Blink Planner

在IDE環境裡,只需要引入兩個 Blink Planner 的相關依賴,就可以啟用 Blink Planner。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.9.0</version></dependency>

對於流計算作業和批處理作業的配置非常類似,只需要在 EnvironmentSettings 中設置 StreamingMode 或 BatchMode 即可,流計算作業的設置如下:

// **********************// BLINK STREAMING QUERY// **********************import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.java.StreamTableEnvironment;StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);bsTableEnv.sqlUpdate(…);bsTableEnv.execute();

批處理作業的設置如下 :

// ******************// BLINK BATCH QUERY// ******************import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);bbTableEnv.sqlUpdate(…)bbTableEnv.execute()

如果作業需要運行在集群環境,打包時將 Blink Planner 相關依賴的 scope 設置為 provided,表示這些依賴由集群環境提供。這是因為 Flink 在編譯打包時, 已經將 Blink Planner 相關的依賴打包,不需要再次引入,避免衝突。

5. 社區長遠計劃

目前,TableAPI & SQL 已經成為 Flink API 的一等公民,社區也將投入更大的精力在這個模塊。在不遠的將來,待 Blink Planner 穩定之後,將會作為默認的 Planner ,而 Old Planner 也將會在合適的時候退出歷史的舞臺。目前社區也在努力賦予 DataStream 批處理的能力,從而統一流批技術棧,屆時 DataSet API 也將退出歷史的舞臺。

相關焦點

  • flink-1.12.0 upsert-kafka connector demo
    (PlannerBase.scala:353) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:220) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.
  • Flink寫入hive測試
    <flink.version>1.11.1</flink.version>    <scala.binary.version>2.11</scala.binary.version>  </properties>  <repositories>    <repository
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    11、flink 啟動時不自動創建 上傳jar的路徑,能指定一個創建好的目錄嗎12、Flink sink to es 集群上報 slot 不夠,單機跑是好的,為什麼?13、Fllink to elasticsearch如何創建索引文檔期時間戳?14、blink有沒有api文檔或者demo,是否建議blink用於生產環境。
  • 寫在阿里Blink正式開源之際
    至於說到 流批覆用,我就默默提一下 structured streaming ,  完全架構在 spark sql 之上,一張無限流動的大表。看文章中重點強調的 BinaryRow 和  codegen 優化方式,這都是 spark 玩了很久很久的東西了,不了解的小夥伴,需要多看看相關的,後續spark君會專門抽文章介紹。
  • Structured Streaming與Flink比較
    Flink作為一個很好用的實時處理框架,也支持批處理,不僅提供了API的形式,也可以寫sql文本。這篇文章主要是幫著大家對於Structured Streaming和flink的主要不同點。文章建議收藏後閱讀。1.
  • Apache Beam實戰指南 | 手把手教你玩轉KafkaIO與Flink
    技術也隨著時代的變化而變化,從 Hadoop 的批處理,到 Spark Streaming,以及流批處理的 Flink 的出現,整個大數據架構也在逐漸演化。Apache Beam 作為新生技術,在這個時代會扮演什麼樣的角色,跟 Flink 之間的關係是怎樣的?Apache Beam 和 Flink 的結合會給大數據開發者或架構師們帶來哪些意想不到的驚喜呢?
  • 一篇文章讓深入理解Flink SQL 時間特性
    1.1 DataStream 轉化成 Table 時指定         由 DataStream 轉換成表時,可以在後面指定欄位名來定義 Schema。在定義 Schema 期間,可以111用.proctime,定義處理時間欄位。
  • Apache Flink 1.5.5 和 1.6.2 發布,通用數據處理平臺
    1.5.5 是 Apache Flink 1.5 系列的第五個 bugfix 版本,包括超過 20 個修復程序和一些小改進,強烈建議所有用戶升級到 Flink 1.5.5,主要改進如下:改進[FLINK-10075] - HTTP connections to a secured REST endpoint
  • Flink最難知識點再解析 | 時間/窗口/水印/遲到數據處理
    數據在源源不斷的進入flink,我們設置好window的大小為5s,flink會以5s來將每分鐘劃分為連續的多個窗口。對於存在延遲的數據,我們能容忍的時間是3s,超過3s我就不等你了,繼續進行窗口操作。這裡就要提到一個知識點:Window的觸發條件是什麼,什麼時候開始進行window操作?好,知道了window觸發條件後我們繼續分析,第一個條件肯定滿足的,只要有數據就行了。
  • Flink保證端到端exactly-once語義(也適用於kafka)
    1.4.0發布。flink提供了一個抽象的TwoPhaseCommitSinkFunction類,來讓開發者用更少的代碼來實現端到端的exactly-once語義。在flink1.4.0之前,flink通過checkpoint保證了flink應用內部的exactly-once語義。現在加入了TwoPhaseCommitSinkFunctio可以保證端到端的exactly-once語義。兩次提交來保證語義的方式需要flink所連接的外部系統支持兩部提交,也就是外部系統要支持可以預提交和回滾沒有最終提交的數據這樣子的特性。
  • 萬字詳解Flink極其重要的Time與Window(建議收藏)
    因此,滑動窗口如果滑動參數小於窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。例如,你有10分鐘的窗口和5分鐘的滑動,那麼每個窗口中5分鐘的窗口裡包含著上個10分鐘產生的數據,如下圖所示:
  • 用Blinkist學英語 (1)
    今天看到了竟然還可以點歌, 那就用正在聽的歌來開始自己的碎碎念.最近開始每天都在聽Blinkist了. (https://www.blinkist.com) check it out.能接名詞的to後面就不能接動詞原形,就是用it(或者任何名詞) 替換後面不能確定成分(不定式/動名詞)的部分。比如我們會說 i look forward to it( the meeting, etc) 但是不能說I want to* it. 而是得說 I want it.
  • 數據說話:大數據處理引擎Spark與Flink比拼
    Flink 任務圖(來源:https://ci.apache.org/projects/flink/flink-docs-release-1.5/concepts/runtime.html)  在 DAG
  • Flink 端到端 Exactly-once 機制剖析
    但是,flink有很多外接系統,比如將數據寫到kafka,一旦作業失敗重啟,offset重置,會消費舊數據,從而將重複的結果寫到kafka。如下圖:      這個時候,僅靠系統本身是無法保證exactly-once的。系統之間的數據一致性一般要靠2PC協議來保證,flink的TwoPhaseCommitSinkFunction也是基於此實現的。
  • 英語中表示婚禮策劃的wedding planner用西語怎麼說?
    英語詞彙「wedding planner」在西班牙語中的同義表達形式是「organizador de bodas」。mucho dinero», «Sandra compagina su faceta como modelo con su formación para ser wedding planner» o «Podría decirse que se acercan más a la figura de un psicólogo que a la de un wedding planner».
  • Flink SQL 實戰:HBase 的結合應用
    當然,本文假設用戶有一定的 HBase 知識基礎,不會詳細去介紹 HBase 的架構和原理,本文著重介紹 HBase 和 Flink 在實際場景中的結合使用。主要分為兩種場景,第一種場景:HBase 作為維表與 Flink Kafka table 做 temporal table join 的場景;第二種場景:Flink SQL 做計算之後的結果寫到 HBase 表,供其他用戶查詢的場景。
  • Flink 是如何將你寫的代碼生成 StreamGraph 的 (上篇)
    一、絮叨兩句新的一年又來了,不知道大家有沒有立幾個每年都完不成的 FLAG ?反正我立了,我今年給自己立的 FLAG 是大致閱讀大數據幾個框架的源碼。為什麼要「大致」閱讀,因為這些牛逼的框架都是層層封裝,搞懂核心原理已經是很不易,更別談熟讀源碼了。之前幾篇源碼閱讀的文章,不知道大家有沒有親自動手打開 Idea 去試一試,這裡我再貼一下文章連結,大家可以再回顧一下。
  • Apache Flink 誤用之痛
    郵件列表:user@flink.apache.com/user-zh@flink.apache.orgStack Overflow:www.stackoverflow.com2.提到一致性和交付保障,其實可以通過幾個問題來引導大家完成這件事,如下圖所示:第1個問題,是否在乎數據的丟失?如果不在乎,你可以沒有 Checkpoint。第2個問題,是否在乎結果的正確性?
  • 提綱挈領,弄清Flink的分層架構
    我們知道,Flink是一個分層架構的系統,每一層所包含的組件都提供了特定的抽象,用來服務於上層組件。Flink分層的組件棧如下圖所示:Flink生態圈核心組件棧掌握了Flink的分層架構,後面的學習就可以圍繞每個層級的核心內容來學習或研究。
  • OptaPlanner 這十年
    今天,Java 規劃引擎:optaplanner,應用在很多項目當中,與同領域軟體相比處於領先地位。OptaPlanner 優化了商業資源調度和規劃。十年,讓我們來看看,我的項目是如何成功的?期間,我做對了什麼,我做錯了什麼?