作者 | 蔡適擇(順豐大數據平臺負責人)
整理 | 趙陽(Flink 社區志願者)
本文主要介紹順豐在數據倉庫的數據實時化、資料庫 CDC、Hudi on Flink 上的實踐應用及產品化經驗。文章主要分為以下幾部分:
順豐業務介紹
Hudi on Flink
產品化支持
後續計劃
1
順豐業務
1.1 順豐大數據的應用
先來看一下順豐大數據業務的全景圖。
大數據平臺,中間的基礎部分是大數據平臺,這塊是順豐結合開源組件自行搭建的。與之相關的是大數據分析與人工智慧,順豐有一個非常強的地面部隊,就是線下的快遞小哥以及運輸車輛,需要使用 AI 以及大數據分析來輔助管理,提升整體效率。
區塊鏈,順豐對接了很多客戶與商家,對於商家來說,首先需要確保快件是可信的能夠做貨物的交易與交換。這塊涉及的基本上都是品牌商家,溯源與存證的業務順豐也有涉及。
IoT,就像之前提及到的,因為順豐地面部隊較多,相應需要採集的數據也會比較多。我們的部分包裹中是有傳感器的,車輛也有相關的傳感器,如車輛的攝像頭,以及快遞小哥的手環(包含地理位置、員工的健康狀態,對應做一些關懷的舉動)。同時,還有一些工作場景既有叉車,也有分揀設備,這些就需要大數據平臺來做一些聯動,因此 IoT 的應用相對較多。
智慧供應鏈和智慧物流,這兩塊更多的是指如何用大數據的手段輔助業務做一些經營上的決策。比如我們有很多 B 端客戶,對於他們來說如何在每個倉庫裡備貨,如何協調以及互相調撥,這部分就由智慧物流來完成。
下面這塊就是 IOT 實踐中的一部分:
從上面可以看出物流本身的環節是非常多的,下單、小哥收件、分揀、陸運中轉等整個過程,紅色解釋部分是指我們會做的一些 IoT 與大數據結合的應用,這裡其實大部分都是基於 Flink 來完成的。
1.2 順豐大數據技術矩陣
下面這張圖是順豐目前大數據整體的架構概覽:
數據集成層:最下面為數據集成層,因為順豐的歷史原因,所以包含了很多數據存儲引擎,如 Oracle、MySQL、MongoDB 等,並且部分引擎仍會繼續支持。右下物聯網設備相對較新,主要是進行包含普通文本、網絡資料庫、圖像、音頻、視頻等的數據採集。
數據存儲計算:實時這塊順豐目前用的最多的還是 Flink,Storm 沒有標示出來,目前我們在做遷移。消息中間件處理目前主要使用 Kafka。然後右邊存儲結構的種類就相對豐富,因為不同的場景有不同的處理方式,比如數據分析需要性能比較強的 Clickhouse;數倉和離線計算這塊還是比較傳統,以 Hive 為主結合 Spark,目前我們是結合 Flink 與 Hudi 去實現離線實時化。
數據產品,我們傾向的還是首先降門檻,讓內部開發與用戶更容易上手。內部同學如果要掌握如此多的組件,成本是非常高的,再加上規範化會導致溝通、維護以及運維的高額成本,所以我們一定要去做一些產品化、規範化的事情。
1.3 順豐科技數據採集組成
上圖就是我們大數據整體數據採集的概覽,數據採集當前包括微服務的應用,部分數據直發到 Kafka,還有些會落成日誌,然後我們自己做了一個日誌採集工具,類似於 Flume,更加的輕量化,達到不丟、不重、以及遠程的更新、限速。另外我們也會將 Kafka 中的數據通過 Flink 放到 HDFS,以 Hudi 的形式去做。下面會詳細介紹。
1.4 順豐數據應用架構
上圖是一個簡單的應用架構,剛才所說的大數據平臺數據我們會按需推送到 OLAP 分析引擎、資料庫,這部分數據推送過去之後,到達數據服務平臺。該數據服務平臺主要是考慮到用戶或研發對接資料庫更便捷,以往在使用時,內部用戶首先需要了解大數據組件的使用,而現在通過我們的數據服務產品以配置化的方式配置查詢條件、聚合條件即可,最終把結果生成一個 restful 接口,業務系統可直接調用。比如研發用戶需要做搜索,只需要關注入參、出參,中間的過程不需要了解,這樣的話就能夠最大化的把技術門檻降下來,使用時也會更高效簡便。
中間部分我們是基於 Kong 做的網關,在 Kong 裡面可以加很多種通用的能力,包括監控、限流、緩存等都可以在裡面完成。
右邊的 Graphql,是 Facebook 開源的一個組件。前端用戶經常會出現需求的變更,後臺接口需要相應地進行調整,這種情況就可以使用 Graphql 來支持。這裡其實是有兩個東西:apollo、graphql_Java,兩條線,apollo 適用於前端的研發用戶,用 node_js 來完成控制層的內容;graphql_Java 適用於後端的用戶,主要提供一些接口。
2
Hudi on Flink
2.1 Hudi 介紹
接下來我們主要介紹 Hudi on Flink 在順豐的應用實踐。Hudi 的核心優勢主要分為兩部分:
首先,Hudi 提供了一個在 Hadoop 中更新刪除的解決方案,所以它的核心在於能夠增量更新,同時增量刪除。增量更新的好處是國內與國際現在對隱私數據的保護要求比較高,比如在 Hive 中清理刪除某一個用戶的數據是比較困難的,相當於重新清洗一遍數據。使用 Hudi 可以根據主鍵快速抓取,並將其刪除掉。
另外,時間漫遊。之前我們有很多應用需要做準實時計算。如果要找出半個小時內的增量到底是什麼,變化點在哪,必須要把一天的數據全撈出來,過濾一遍才能找出來。Hudi 提供時間漫遊能力,只需要類似 SQL 的語法就能快速地把全部增量撈出來,然後後臺應用使用時,就能夠直接根據裡面的數據做業務的更新,這是 Hudi 時間漫遊裡最重要的能力。
Hudi 有兩種的寫的方法:
copy on write。
copy on write 這種形式更多是在每次寫的時候,能夠重寫歷史中關於更新記錄所在的文件,把它重寫並且把增量部分再重新記錄下來,相當於把歷史狀態也給記錄下來。唯一的不足之處在於,寫的時候性能會稍微弱,但是讀的性能是很強的,和正常使用 Hive 沒有什麼區別。這個也是 Hudi 本身的優點。實時性略低,這部分取決於寫的文件合併的頻率。不過批量的話,寫也不會影響到多少性能,所以本身也是批量的去寫。比如每隔幾分鐘寫一次,這個其實也不會產生很高的性能損耗,這就是 copy on write。
merge on read
merge on read 就是寫的時候實時會把 log 以 append 方式寫到 HDFS 中並寫成文件,然後在讀的時候將已經生成的文本,再加上增量的部分合併,做一個 merge 操作。好處在於查詢的時候數據都是實時的,但是由於查詢任務確實較多,相當於是說每次查的時候,都要把兩部分數據取出來並做一個合併,因此也會造成損耗。
以上是 Hudi 情況的簡單介紹。
2.2 Hudi on Flink 組成部分 - 資料庫實時化
上圖是我們將數據實時化 CDC 的過程。資料庫的 CDC,基本上都是只能到庫級別、庫粒度。前面的 source 支撐肯定也還是庫粒度,中間會經過兩個過程:
一部分是 DML,它會有過濾,當庫裡面有 100 張表時,很多時候有些表是不需要的,這部分我們會直接過濾掉,過濾就主要是通過產品化來打通它。
另一部分是 DDl,能夠實時更新 schema。比如庫表欄位的增加或者變更,再或者可能加了個表或者改了一個表,這部分會在實時程序中打通數據直通車,只要有任何變更,就會生成一個新的版本,然後將元數據信息記錄到直通車裡,同時也會包裝到 binlog kafka sink 裡記錄,每一行會打上相應的版本號。這樣的話就對於後面的使用就能夠直接對應該條記錄,使用非常方便,不會有出錯的情況。
2.3 Hudi on Flink 組成部分 - 數倉實時化
這部分主要分享我們數倉實時化的過程,我們的目標是實現 Kafka 裡的數據在當前離線數倉中也能真正用起來,包括很多做準實時計算的用戶也能夠真正用起來。Hudi on Flink 就是我們嘗試的方案。以前 Hudi 這塊也做了 Hudi on Spark 方案,是官方推薦使用的方案,其實相當於多維護一個組件,但是我們大方向上還是希望所有實時的東西都能夠讓 Flink 去完成,另外也希望是 Flink 的應用生態能夠做得更加全面,在這部分就真正去把它落地下來,並且在生產中應用起來。
其實整個過程,比如做表數據實時化的時候,它是分為兩部份,一部分數據初始化,在啟動的時候,會把數據重新做批量的拉取,這個是用 Flink batch 來做的,其實社區本身也有提供這種能力。另外 Hudi 本身也具備把存量的 Hive 表 Hudi 化的能力,這是 Hudi 最新才出來的功能。這部分我們會用 Flink batch 的方式重新抽一遍,當然也有存量,對於存量的一些表,可以直接用存量表來轉化,然後用 Flink batch 做初始化。
另外一部分是增量更新,增量更新是指有個 DB connect 對接 Kafka,從 Kafka 的 source 拿到資料庫增量 CDC 的 binlog,然後把 binlog 進行加工,同時再利用 Flink 本身的 checkpoint 機制(Flink 本身的 checkpoint 整體頻率可以控制)進行 snapshot 的過程。其中所做的內容也我們自己可以控制的,所以採用 checkpoint 的形式可以把 Hudi 所需要做的 upsert 的操作全部在 checkpoint 中更新到線上,最終形成 Hudi 裡面的實時數據。
2.4 Hudi 數倉寬表方案
直接將 Kafka 數據扔到 Hudi 裡相對容易,真正困難的點在於寬表。對於整個 Hudi 來說,寬表是涉及到很多維表,當很多維表或者事實表更新的時候,會由多個事實表做一個關聯。但不是每個事實表都能抓到寬表的真正主鍵,因此 Hudi 沒法做這種更新。所以如何把寬表做數據實時化是一個難題。
上圖是順豐的寬表方案。
第一層,對於 ODS,可以直接連接 Kafka,用 Hudi on Flink 的框架就能夠完成。
第二層,DWD,這裡也有兩種辦法:
一種是用 Flink SQL 先把實時的 Kafka 寬表做完,不過這種辦法成本會高一點,相當於再次引入了 Kafka,整個數據鏈路變長,如果真正需要去用實時寬表可以小部分去推,但如果不存在純實時數據的需求,就沒有必要去做 DWD 的實時 Kafka 寬表。
另外,在沒有 DWD 的實時 Kafka 寬表的情況下,如何完成上述離線層的 DWD 實時化?這裡有幾個步驟,首先創建一個維表的 UDF 做表關聯,也是最方便的方式。其次,可以考慮直接用 join 的方式,用兩個實時表來做關聯,但可能存在關聯不到的情況。
當然,做維表關聯,就涉及到外鍵主鍵的映射。外鍵主鍵映射是為了讓我們能夠在另一個事實表更新時,快速找到主鍵在哪,即外鍵主鍵的映射 。另外主鍵索引,主鍵索引其實也是跟外鍵主鍵的映射相關。至於外鍵主鍵的映射,相當於把它建成一個新的表主鍵索引獲取,這樣增量更新 Hudi 跟原來的 ODS 層就基本上一致了,這就是寬表實時加工的過程。下圖為運單的寬表舉例。
3
產品化支持
上述從技術層面分析了順豐當下業務架構的相關情況,以下將分享我們在產品化上所做的一些支持工作。
3.1 數據直通車
上圖是我們的數據直通車,能夠做到讓用戶自己在產品中操作,不需要寫代碼即可完成,可以實現低門檻的快速簡便的應用。比如配置數據接入僅需 1 分鐘左右,整個過程就是在產品上以配置化的手段就能夠將數據最終落在資料庫,我們的離線表、數倉、做數據分析都能夠直接快速的運用起來。
另外,數據接入進來之後,需要有數據管理的能力。上圖是數據管理能力測試環境的簡單情況,我們需要讓用戶能夠管理相關的數據,首先誰用它了,其次它涉及什麼欄位,有哪些具體的內容,同時它裡面的血緣關係又是怎麼樣的,這個就是我們數據資產管理所具備的功能。
3.2 實時數據使用
上圖是我們 binlog 的 SDK,其實像 binlog 這種 avro 的格式,對用戶來說使用有一定門檻。但還是有一些編碼的用戶,對於這些用戶我們提供具體的 SDK,所以在 SDK 裡真正使用時都做到簡便。左邊看起來是 json,實際上是 avro 格式。右邊的內容就是在 Java 上的使用情況,這個是在代碼層面輔助研發快速應用的工具。
我們在平臺上也做一些簡化的內容,首先有一部分是關於拖拽的,拖拽是指封裝一些組件,用戶可以通過拖拽來快速完成其需求。這個產品上線後,很多之前沒有任何實時計算的經驗,甚至連離線開發的經驗也沒有的用戶都能夠做實時的數據開發。
上圖為實時指標採集,產品上線之後有很多監控的需求,Flink 本身提供很多 Metric,用戶也有很多 Metric,我們希望為用戶提供一個高效的解決方案,把 Metric 全部採集出來,讓用戶能夠快速應用。
這裡在監控裡面也做了幾個工作,一個是爬蟲方案,實現一個 akka 的客戶端,Flink 本身是 akka 的框架,每個 jobmannager 都有 akka 的服務、接口,這樣只要實現一個 akka 的客戶端,就能夠以 akka 的 API 形式獲取具體的 Metric 情況。這部分採集完之後發到 Kafka,最終存到 TDengine 再到 Grafana,提供給用戶。Grafana 也會整合到我們的實時計算平臺產品裡面來,在面對存量的情況時,不需要重啟用戶的任務,就能夠直接做數據採集。
但在面對增量情況時,就需要補充一些 Metric,比如 CPU 使用率、內存的使用率等。這部分我們以 Reporter 方案來滿足,Reporter 方案也是社區當前主推的方案。Reporte r 方案的原理其實是在 Flink 的 Metrics Reporter 裡進行插件開發,然後發到 gateway,這個 gateway 其實就是為了避免 Kafka 客戶端過多的問題,所以這裡中間做一個網關,後面還是和上面的一致,這個就是 Flink 的任務監控情況。
4
後續計劃
上述已經分享了我們在內部已經落地、實際應用的過程,後續我們還會做什麼?
4.1 彈性計算
首先,彈性計算。目前像監控任務,用戶申請的資源遠遠超過實際需要使用的資源,會造成嚴重的資源浪費,內存也一樣。處理類似情況時,我們使用了 Flink 延伸的框架 Metrics monitor,結合採集的 Metrics,能夠做到當整個使用率過低或過高的時候,及時調整達到資源擴縮容或者並發擴容。
4.2 Flink 替換 Hive 演進
上面提到我們存量是有非常多的 Hive 任務,包括 Spark 任務需要進行替換,但怎麼去做呢?
首先我們用 Flink 來替換,由於強制或平臺自動推薦都有難度,所以我們做了一些折中方案。比如埋點,當需要把數據寫到 Hive 的某個表,它會經過 Hiveserver,SQL 解析之後,此時將表進行替換,執行兩個路線:一個是正常的 table 這樣執行會寫到 Hive 裡面去。另外也會埋點把寫的表替換成另一個表,然後同時再以 Flink 的形式去執行一遍,不過會產生額外的資源消耗,執行大概生成兩個表,需要自動計算兩者是否一致。如一致測試穩定後就能以計算框架來去替換它。
大部分任務是兼容的可替換的,但也有小部分不兼容的情況,這部分可以採取人工處理,以儘量實現整個技術上的統一,這部分是後續需要完成的。
4.3 批流一體化
上圖是我們做批流一體化的過程,批流一體化在元數據管理與權限管理部分都已經有一些落地。
除此之外我們結合剛剛所說替換的過程,上圖就是 SQL 的兼容測試。因為這幾者都做完之後,其實批流一體化可以同步去做,相當於同一個接口,加一個參數,即可實現流批處理底層引擎的快速切換,有助於整個數據開發能夠保持一致,所以批流一體化也是後面需要嘗試的。
上圖實際上是我們一體化整個框架的最終形式。首先上面有一層 IDE 能夠讓所有的用戶使用。然後下面各種基礎功能支持,包括自動補全的 SQL 語法解析功能的支持,再往下就是一些資源管理、調度管理和知識管理,這些也是為了輔助開發而用的。再下面一層是計算引擎,要把這些計算引擎跟用戶做一個大的隔離,讓用戶不用再關注底層技術的實現和使用,這是我們後面的要持續去做的事情。
活動推薦
12 月 13 日 -15 日,大數據領域頂級盛會 Flink Forward Asia 2020 在線峰會即將重磅開啟!
來自阿里巴巴、騰訊、字節跳動、美團、滴滴、微博、網易、京東、知乎等全球 38+ 一線廠商分享 70+ 優質議題。內容涵蓋 Flink 年度最佳實踐、核心技術解析、實時數倉應用、開源生態發展、機器學習及金融行業應用等多個領域!