本文為 Apache Flink 新版本重大功能特性解讀之 Flink SQL 系列文章的開篇,Flink SQL 系列文章由其核心貢獻者們分享,涵蓋基礎知識、實踐、調優、內部實現等各個方面,帶你由淺入深地全面了解 Flink SQL。
1. 發展歷程
今年的8月22日 Apache Flink 發布了1.9.0 版本(下文簡稱1.9),在 Flink 1.9 中,Table 模塊迎來了核心架構的升級,引入了阿里巴巴Blink團隊貢獻的諸多功能,本文對Table 模塊的架構進行梳理並介紹如何使用 Blink Planner。
Flink 的 Table 模塊 包括 Table API 和 SQL,Table API 是一種類SQL的API,通過Table API,用戶可以像操作表一樣操作數據,非常直觀和方便;SQL作為一種聲明式語言,有著標準的語法和規範,用戶可以不用關心底層實現即可進行數據的處理,非常易於上手,Flink Table API 和 SQL 的實現上有80%左右的代碼是公用的。作為一個流批統一的計算引擎,Flink 的 Runtime 層是統一的,但在 Flink 1.9 之前,Flink API 層 一直分為DataStream API 和 DataSet API, Table API & SQL 位於 DataStream API 和 DataSet API 之上。
Flink 1.8 Table 架構
在 Flink 1.8 架構裡,如果用戶需要同時流計算、批處理的場景下,用戶需要維護兩套業務代碼,開發人員也要維護兩套技術棧,非常不方便。 Flink 社區很早就設想過將批數據看作一個有界流數據,將批處理看作流計算的一個特例,從而實現流批統一,阿里巴巴的 Blink 團隊在這方面做了大量的工作,已經實現了 Table API & SQL 層的流批統一。 幸運的是,阿里巴巴已經將 Blink 開源回饋給 Flink 社區。為了實現 Flink 整個體系的流批統一,在結合 Blink 團隊的一些先行經驗的基礎上,Flink 社區的開發人員在多輪討論後,基本敲定了Flink 未來的技術架構。
Flink 未來架構
在Flink 的未來架構中,DataSet API將被廢除,面向用戶的API只有 DataStream API 和 Table API & SQL,在實現層,這兩個API共享相同的技術棧,使用統一的 DAG 數據結構來描述作業,使用統一的 StreamOperator 來編寫算子邏輯,以及使用統一的流式分布式執行引擎,實現徹底的流批統一。 這兩個API都提供流計算和批處理的功能,DataStream API 提供了更底層和更靈活的編程接口,用戶可以自行描述和編排算子,引擎不會做過多的幹涉和優化;Table API & SQL 則提供了直觀的Table API、標準的SQL支持,引擎會根據用戶的意圖來進行優化,並選擇最優的執行計劃。
2.Flink 1.9 Table 架構
Blink 的 Table 模塊的架構在開源時就已經實現了流批統一,向著 Flink 的未來架構邁進了第一步,走在了 Flink 社區前面。 因此在 Flink 1.9 合入 Blink Table 代碼時,為了保證 Flink Table 已有架構和 Blink Table的架構能夠並存並朝著 Flink 未來架構演進,社區的開發人員圍繞FLIP-32(FLIP 即 Flink Improvement Proposals,專門記錄一些對Flink 做較大修改的提議。FLIP-32是:Restructure flink-table for future contributions) 進行了重構和優化,從而使得 Flink Table 的新架構具備了流批統一的能力,可以說 Flink 1.9 是 Flink 向著流批徹底統一這個未來架構邁出的第一步。
Flink 1.9 Table 架構
在 Flink Table 的新架構中,有兩個查詢處理器:Flink Query Processor 和 Blink Query Processor,分別對應兩個Planner,我們稱之為 Old Planner 和 Blink Planner。查詢處理器是 Planner 的具體實現, 通過parser(解析器)、optimizer(優化器)、codegen(代碼生成技術)等流程將 Table API & SQL作業轉換成 Flink Runtime 可識別的 Transformation DAG (由Transformation組成的有向無環圖,表示作業的轉換邏輯),最終由 Flink Runtime 進行作業的調度和執行。
Flink 的查詢處理器針對流計算和批處理作業有不同的分支處理,流計算作業底層的 API 是 DataStream API, 批處理作業底層的 API 是 DataSet API;而 Blink 的查詢處理器則實現流批作業接口的統一,底層的 API 都是Transformation。
3.Flink Planner 與 Blink Planner
Flink Table 的新架構實現了查詢處理器的插件化,社區完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,用戶可以自行選擇使用 Old Planner 還是 Blink Planner。
在模型上,Old Planner 沒有考慮流計算作業和批處理作業的統一,針對流計算作業和批處理作業的實現不盡相同,在底層會分別翻譯到 DataStream API 和 DataSet API 上。而 Blink Planner 將批數據集看作 bounded DataStream (有界流式數據) ,流計算作業和批處理作業最終都會翻譯到 Transformation API 上。 在架構上,Blink Planner 針對批處理和流計算,分別實現了BatchPlanner 和 StreamPlanner ,兩者共用了大部分代碼,共享了很多優化邏輯。 Old Planner 針對批處理和流計算的代碼實現的是完全獨立的兩套體系,基本沒有實現代碼和優化邏輯復用。
除了模型和架構上的優點外,Blink Planner 在阿里巴巴集團內部的海量業務場景下沉澱了許多實用功能,集中在三個方面:
Blink Planner 對代碼生成機製做了改進、對部分算子進行了優化,提供了豐富實用的新功能,如維表 join、Top N、MiniBatch、流式去重、聚合場景的數據傾斜優化等新功能。Blink Planner 的優化策略是基於公共子圖的優化算法,包含了基於成本的優化(CBO)和基於規則的優化(CRO)兩種策略,優化更為全面。同時,Blink Planner 支持從 catalog 中獲取數據源的統計信息,這對CBO優化非常重要。Blink Planner 提供了更多的內置函數,更標準的 SQL 支持,在 Flink 1.9 版本中已經完整支持 TPC-H ,對高階的 TPC-DS 支持也計劃在下一個版本實現。整體看來,Blink 查詢處理器在架構上更為先進,功能上也更為完善。出於穩定性的考慮,Flink 1.9 默認依然使用 Flink Planner,用戶如果需要使用 Blink Planner,可以作業中顯式指定。
4.如何啟用 Blink Planner
在IDE環境裡,只需要引入兩個 Blink Planner 的相關依賴,就可以啟用 Blink Planner。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.9.0</version></dependency>
對於流計算作業和批處理作業的配置非常類似,只需要在 EnvironmentSettings 中設置 StreamingMode 或 BatchMode 即可,流計算作業的設置如下:
// **********************// BLINK STREAMING QUERY// **********************import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.java.StreamTableEnvironment;StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);bsTableEnv.sqlUpdate(…);bsTableEnv.execute();
批處理作業的設置如下 :
// ******************// BLINK BATCH QUERY// ******************import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);bbTableEnv.sqlUpdate(…)bbTableEnv.execute()
如果作業需要運行在集群環境,打包時將 Blink Planner 相關依賴的 scope 設置為 provided,表示這些依賴由集群環境提供。這是因為 Flink 在編譯打包時, 已經將 Blink Planner 相關的依賴打包,不需要再次引入,避免衝突。
5. 社區長遠計劃
目前,TableAPI & SQL 已經成為 Flink API 的一等公民,社區也將投入更大的精力在這個模塊。在不遠的將來,待 Blink Planner 穩定之後,將會作為默認的 Planner ,而 Old Planner 也將會在合適的時候退出歷史的舞臺。目前社區也在努力賦予 DataStream 批處理的能力,從而統一流批技術棧,屆時 DataSet API 也將退出歷史的舞臺。