Flink 源碼|自定義 Format 消費 Maxwell CDC 數據

2021-01-09 阿里云云棲號

Flink 1.11 最重要的 Feature —— Hive Streaming 之前已經和大家分享過了,今天就和大家來聊一聊另一個特別重要的功能 —— CDC。

CDC概述

何為CDC?Change Data Capture,將資料庫中的』增』、』改』、』刪』操作記錄下來。在很早之前是通過觸發器來完成記錄,現在通過 binlog+同步中間件來實現。常用的 binlog 同步中間件有很多,比如 Alibaba 開源的 canal[1],Red Hat 開源的debezium[2],Zendesk 開源的 Maxwell[3] 等等。

這些中間件會負責 binlog 的解析,並同步到消息中間件中,我們只需要消費對應的 Topic 即可。

回到 Flink 上,CDC 似乎和我們沒有太大的關聯?其實不然,讓我們更加抽象地來看這個世界。

當我們用 Flink 去消費數據比如 Kafka 時,我們就仿佛在讀一張表,什麼表?一張不斷有記錄被插入的表,我們將每一條被插入的數據取出來,完成我們的邏輯。

當插入的每條數據都沒有問題時,一切都很美好。關聯、聚合、輸出。

但當我們發現,某條已經被計算過的數據有問題時,麻煩大了。我們直接改最後的輸出值其實是沒有用的,這次改了,當再來數據觸發計算時,結果還是會被錯誤的數據覆蓋,因為中間計算結果沒有被修改,它仍然是一個錯誤的值。怎麼辦?撤回流似乎能解決這個問題,這也確實是解決這個問題的手段,但是問題來了,撤回流怎麼確定讀取的數據是要被撤回的?另外,怎麼去觸發一次撤回?

CDC 解決了這些:將消息中間件的數據反序列化後,根據 Type 來識別數據是 Insert 還是 Delete;另外,如果大家看過 Flink 源碼,會發現反序列化後的數據類型變了,從 Row 升級為 RowData,RowData 能夠將數據標記為撤回還是插入,這就意味著每個算子能夠判斷出數據到底是需要下發還是撤回。

CDC 的重要性就先說這麼多,之後有機會的話,出一篇實時 DQC 的視頻,告訴大家 CDC 的出現,對於實時 DQC 的幫助有多大。下面讓我們回到正題。

既然有那麼多 CDC 同步中間件,那麼一定會有各種各樣的格式存放在消息中間件中,我們必然需要去解析它們。於是 Flink 1.11 提供了 canal-json 和 debezium-json,但我們用的是 Maxwell 怎麼辦?只能等官方出或者說是等有人向社區貢獻嗎?那如果我們用的是自研的同步中間件怎麼辦?

所以就有了今天的分享:如何去自定義實現一個 Maxwell format。大家也可以基於此文的思路去實現其他 CDC format,比如 OGG, 或是自研 CDC 工具產生的數據格式。

如何實現

當我們提交任務之後,Flink 會通過 SPI 機制將 classpath 下註冊的所有工廠類加載進來,包括 DynamicTableFactory、DeserializationFormatFactory 等等。而對於 Format 來說,到底使用哪個 DeserializationFormatFactory,是根據 DDL 語句中的 Format 來決定的。通過將 Format 的值與工廠類的 factoryIdentifier() 方法的返回值進行匹配 來確定。

再通過 DeserializationFormatFactory 中的 createDecodingFormat(...) 方法,將反序列化對象提供給 DynamicTableSource。

通過圖來了解整個過程(僅從反序列化數據並消費的角度來看):

想要實現 CDC Format 去解析某種 CDC 工具產生的數據其實很簡單,核心組件其實就三個:

工廠類(DeserializationFormatFactory):負責編譯時根據 『format』 = 『maxwell-json』創建對應的反序列化器。即 MaxwellJsonFormatFactory。反序列化類(DeserializationSchema):負責運行時的解析,根據固定格式將 CDC 數據轉換成 Flink 系統能認識的 INSERT/DELETE/UPDATE 消息,如 RowData。即 MaxwellJsonDeserializationSchema。Service 註冊文件:需要添加 Service 文件 META-INF/services/org.apache.flink.table.factories.Factory ,並在其中增加一行我們實現的 MaxwellJsonFormatFactory 類路徑。再通過代碼,來看看反序列化中的細節:

publicvoid deserialize(byte[] message, Collectorout) throws IOException { try { RowData row = jsonDeserializer.deserialize(message); String type = row.getString(2).toString(); // "type" field if (OP_INSERT.equals(type)) { RowData insert = row.getRow(0, fieldCount); insert.setRowKind(RowKind.INSERT); out.collect(insert); } elseif (OP_UPDATE.equals(type)) { GenericRowData after = (GenericRowData) row.getRow(0, fieldCount); // "data" field GenericRowData before = (GenericRowData) row.getRow(1, fieldCount); // "old" field for (int f = 0; f < fieldCount; f++) { if (before.isNullAt(f)) { before.setField(f, after.getField(f)); } } before.setRowKind(RowKind.UPDATE_BEFORE); after.setRowKind(RowKind.UPDATE_AFTER); out.collect(before); out.collect(after); } elseif (OP_DELETE.equals(type)) { RowData delete = row.getRow(0, fieldCount); delete.setRowKind(RowKind.DELETE); out.collect(delete); } else { if (!ignoreParseErrors) { throw new IOException(format( "Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'", type, new String(message))); } } } catch (Throwable t) { if (!ignoreParseErrors) { throw new IOException(format( "Corrupt Maxwell JSON message '%s'.", new String(message)), t); } } }

其實並不複雜:先通過 jsonDeserializer 將字節數組根據 [data: ROW, old: ROW, type: String] 的 schema 反序列化成 RowData,然後根據 「type」 列的值來判斷數據是什麼類型:增、改、刪;再根據數據類型取出 「data」 或者 「old」 區的數據,來組裝成 Flink 認識的 INSERT/DELETE/UPDATE 數據並下發。

對象 jsonDeserializer 即 JSON 格式的反序列化器,它可以通過指定的 RowType 類型,讀取 JSON 的字節數組中指定的欄位並反序列化成 RowData。在我們的場景中,我們需要去讀取如下 Maxwell 數據的 「data」, 「old」 和 「type」 部分的數據。

{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1}}

因此 MaxwellJsonDeserializationSchema 中定義的 JSON 的 RowType 如下所示。

privateRowTypecreateJsonRowType(DataType databaseSchema) { // Maxwell JSON contains other information, e.g. "database", "ts"// but we don't need themreturn (RowType) DataTypes.ROW( DataTypes.FIELD("data", databaseSchema), DataTypes.FIELD("old", databaseSchema), DataTypes.FIELD("type", DataTypes.STRING())).getLogicalType(); }

databaseSchema 是用戶通過 DDL 定義的 schema 信息,也對應著資料庫中表的 schema。結合上面的 JSON 和代碼,我們能夠得知 jsonDeserializer 只會取走 byte[] 中 data、old、type 這三個欄位對應的值,其中 data 和old 還是個嵌套JSON,它們的 schema 信息和 databaseSchema 一致。由於 Maxwell 在同步數據時,「old」區不包含未被更新的欄位,所以 jsonDeserializer 返回後,我們會通過 「data」 區的 RowData 將 old 區的缺失欄位補齊。

得到 RowData 之後,會取出 type 欄位,然後根據對應的值,會有三種分支:

insert:取出 data 中的值,也就是我們通過DDL定義的欄位對應的值,再將其標記為 RowKind.INSERT 類型數據,最後下發。update:分別取出 data 和 old 的值,然後循環 old 中每個欄位,欄位值如果為空說明是未修改的欄位,那就用 data 中對應位置欄位的值替代;之後將 old 標記為 RowKind.UPDATE_BEFORE 也就意味著 Flink 引擎需要將之前對應的值撤回,data 標記為 RowKind.UPDATE_AFTER 正常下發。delete:取出 data 中的值,標記為 RowKind.DELETE,代表需要撤回。處理的過程中,如果拋出異常,會根據 DDL 中maxwell-json.ignore-parse-errors的值來確定是忽視這條數據繼續處理下一條數據,還是讓任務報錯。

筆者在 maxwell-json 反序列化功能的基礎之上,還實現了序列化的功能,即能將 Flink 產生的 changelog 以 Maxwell 的 JSON 格式輸出到外部系統中。其實現思路與反序列化器的思路正好相反,更多細節可以參考 Pull Request 中的實現。

PR 實現詳情連結: https://github.com/apache/flink/pull/13090

功能演示

給大家演示一下從 Kafka 中讀取 Maxwell 推送來的 maxwell json 格式數據,並將聚合後的數據再次寫入 Kafka 後,重新讀出來驗證數據是否正確。

Kafka 數據源表

CREATETABLE topic_products ( -- schema is totally the same to the MySQL "products" tableidBIGINT, nameSTRING, description STRING, weight DECIMAL(10, 2)) WITH ('connector' = 'kafka','topic' = 'maxwell','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','format' = 'maxwell-json');

Kafka 數據結果表&數據源表

CREATETABLE topic_sink ( nameSTRING, sum_weight DECIMAL(10, 2)) WITH ('connector' = 'kafka','topic' = 'maxwell-sink','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','format' = 'maxwell-json');

MySQL 表

-- 注意,這部分 SQL 在 MySQL 中執行,不是 Flink 中的表CREATETABLE product (idINTEGERNOTNULL AUTO_INCREMENT PRIMARY KEY,nameVARCHAR(255),description VARCHAR(512),weight FLOAT);truncate product ;ALTERTABLE product AUTO_INCREMENT = 101;INSERTINTO productVALUES (default,"scooter","Small 2-wheel scooter",3.14), (default,"car battery","12V car battery",8.1), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), (default,"hammer","12oz carpenter's hammer",0.75), (default,"hammer","14oz carpenter's hammer",0.875), (default,"hammer","16oz carpenter's hammer",1.0), (default,"rocks","box of assorted rocks",5.3), (default,"jacket","water resistent black wind breaker",0.1), (default,"spare tire","24 inch spare tire",22.2);UPDATE product SET description='18oz carpenter hammer'WHEREid=106;UPDATE product SET weight='5.1'WHEREid=107;INSERTINTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);INSERTINTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);UPDATE product SET description='new water resistent white wind breaker', weight='0.5'WHEREid=110;UPDATE product SET weight='5.17'WHEREid=111;DELETEFROM product WHEREid=111;UPDATE product SET weight='5.17'WHEREid=102orid = 101;DELETEFROM product WHEREid=102orid = 103;

先看看能不能正常讀取 Kafka 中的 maxwell json 數據。

select * from topic_products;

可以看到,所有欄位值都變成了 Update 之後的值,同時,被 Delete 的數據也沒有出現。

接著讓我們再將聚合數據寫入 Kafka。

insert into topic_sink select name,sum(weight) as sum_weight from topic_products group by name;

在 Flink 集群的 Web 頁面也能夠看到任務正確提交,接下來再讓我們把聚合數據查出來。

select * from topic_sink

最後,讓我們查詢一下 MySQL 中的表,來驗證數據是否一致;因為在 Flink 中,我們將 weight 欄位定義成 Decimal(10,2),所以我們在查詢 MySQL 的時候,需要將 weight 欄位進行類型轉換。

沒有問題,我們的 maxwell json 解析很成功。

寫在最後

根據筆者實現 maxwell-json format 的經驗,Flink 對於接口的定義、對於模塊職責的劃分還是很清晰的,所以實現一個自定義 CDC format 非常簡單(核心代碼只有200多行)。因此,如果你是用的 OGG,或是自研的同步中間件,可以通過本文的思路快速實現一個 CDC format,一起解放你的 CDC 數據!

相關焦點

  • 使用Flink進行實時日誌聚合:第一部分
    使用Flink、Kafka和Solr進行日誌聚合在此初始解決方案中,讓我們使用Cloudera平臺中可用的處理框架來構建可伸縮且完全可自定義的日誌聚合堆棧。我們的目標是建立一個日誌聚合管道,以服務於我們的實時數據處理應用程式以及任何數據處理或其他類型的應用程式。
  • incaseformat病毒發展歷程,如何殺毒及恢復數據!
    近兩日,PC端一種名為incaseformat的蠕蟲病毒在國內爆發,會強制性的自複製並刪除除系統盤外的所有磁碟文件,對用戶造成難以估量的損失。因此,事感焦急,這邊給大家詳細介紹incaseformat病毒發展歷程,如何殺毒及被病毒刪除的數據如何恢復回來,及時止損。
  • 源碼資本王星石:消費升級大潮下,如何挖掘獨角獸? | 獵雲網
    獵雲註:如何理解消費升級現象背後的規律,並發掘偉大的消費企業?源碼資本王星石認為,需求分化、此消彼長是消費升級的主要行為模式,「此消」、「彼長」兩個方向都有機會出現獨角獸。獨角獸來自三個象限,即大眾升級需求、大眾降級需求、小眾升級需求。
  • 仿微信的IM聊天時間顯示格式(含iOS/Android/Web實現)[圖文+源碼]
    ,效果可媲美微信 [附件下載]》《高仿Android版手機QQ可拖拽未讀數小氣泡源碼 [附件下載]》《Android聊天界面源碼:實現了聊天氣泡、表情圖標(可翻頁) [附件下載]》《高仿Android版手機QQ首頁側滑菜單源碼 [附件下載]》《分享java AMR音頻文件合併源碼,全網最全》《Android版高仿微信聊天界面源碼
  • 袋鼠雲大數據崗位面試題
    (2)flink的怎麼和rocksDB交互的。畫一個流程圖。(這個我也不會)  (3)flink怎麼實現Exactly-once?  (4)flink on yarn 的任務提交流程?  (5)rocksDB為什麼可以存儲那麼大的數據量。  (6)使用eventtime+watermark的時候,如果數據到6點結束了。
  • Pengenalan Format-format pada Video
    yang digunakannya pada Smartphone.AVI (Audio Video Interleave)AVI atau dapat disebut Audio Video Interleave merupakan format berkas video buatan Microsoft.
  • SpringSecurity系列之自定義登錄驗證成功與失敗的結果處理
    SpringSecurity系列之自定義登錄驗證成功與失敗的結果處理一、需要自定義登錄結果的場景在我之前的文章中,做過登錄驗證流程的源碼解析。作為安全框架的時候,都需要我們使用本節學到的知識進行自定義的登錄驗證結果處理。
  • Incaseformat蠕蟲病毒爆發,北信源用戶無需緊張
    摘要:管理員注意,incaseformat蠕蟲病毒來襲!請儘快安裝終端安全管理軟體,進行終端防護及時止損。事件背景:近期,國內多家用戶反饋辦公設備被incaseformat 蠕蟲病毒感染,機器中除了系統盤以外,其他文件全部被刪除。
  • 全球數據資源
    http://www.spatialecology.com/ *提供很多有用的GIS擴展模塊和工具http://cdc.cma.gov.cn/ *中國氣象科學數據共享服務網以下部分轉貼自數據的天空綜合資料、降水、SST、地面覆蓋資料、風場/OLR/指數資料綜合資料庫:http://www.lasg.ac.cn/cgi-bin/
  • incaseformat蠕蟲病毒來襲
    近日發現,名為incaseformat的蠕蟲病毒在國內大面積爆發,感染現象為除C盤外,其他盤全部被格式化,造成數據大量丟失,信息丟失。目前,已發現多個區域不同行業用戶遭到感染,病毒傳播範圍暫未見明顯的針對性。
  • incaseformat蠕蟲病毒爆發 深信服免費提供查殺工具
    近日,深信服安全團隊監測到一種名為incaseformat的病毒,全國各個區域都出現了被incaseformat病毒刪除文件的用戶。經調查,該蠕蟲正常情況下表現為文件夾蠕蟲,執行後會自複製到系統盤Windows目錄下,並創建註冊表自啟動,一旦用戶重啟主機,使得病毒母體從Windows目錄執行,病毒進程將會遍歷除系統盤外的所有磁碟文件進行刪除,對用戶造成不可挽回的損失。
  • incaseformat蠕蟲病毒爆發,深信服免費提供查殺工具
    近日,深信服安全團隊監測到一種名為incaseformat的病毒,全國各個區域都出現了被incaseformat病毒刪除文件的用戶。經調查,該蠕蟲正常情況下表現為文件夾蠕蟲,執行後會自複製到系統盤Windows目錄下,並創建註冊表自啟動,一旦用戶重啟主機,使得病毒母體從Windows目錄執行,病毒進程將會遍歷除系統盤外的所有磁碟文件進行刪除,對用戶造成不可挽回的損失。
  • Incaseformat病毒大面積突發,警惕文件遭刪除
    近日,一種叫Incaseformat的病毒席捲而來,全國各個區域都有用戶出現電腦被Incaseformat病毒佔據的跡象。具體表現形式為:除c盤以外的所有磁碟都被清空,或者存在一兩個文件夾,且文件夾內包含著名稱為「incaseformat.txt」的文件。那麼這種病毒到底是什麼來頭?會給我們的計算機帶來什麼危害?
  • 數據自動化第三篇(上):三種方法實現數據入庫功能
    老規矩,先看代碼結構:我們在這裡定義一個類,叫做mysqlelper當中包含如下幾個函數:__init__:初始化這個類,並且接收資料庫的資料庫地址、埠、帳號、密碼、資料庫名稱等信息;insterdata_bydf:插入數據函數。
  • 俯瞰Dubbo全局,閱讀源碼前必須掌握這些!!
    自開源半年多以來,已成功為十幾家中小型企業提供了精準定時調度方案,經受住了生產環境的考驗。為使更多童鞋受益,現給出開源框架地址:https://github.com/sunshinelyz/mykit-delay既然是要寫深度解析Dubbo源碼的系列專題,我們首先要做的就是搭建一套Dubbo的源碼環境,正所謂「工欲善其事,必先利其器」。
  • 重磅:LayaAir2.2將引擎源碼全面切換至TypeScript,支持WebGL2.0,並...
    從此以後,LayaAir引擎源碼從ActionScript3全面轉向TypeScript,引擎運行的JS全面支持ES6標準,在未來的引擎API設計上會利用TypeScript語言特性進行調整,提升開發效率。除此之外,這次的版本,引擎的默認圖形API調整為WebGL 2.0,且3D方面也有著大量實用功能的增加,本文會逐一為大家介紹。
  • incaseformat蠕蟲病毒來襲,磁碟文件被大規模刪除
    2021年1月13日,一種名為incaseformat的蠕蟲病毒在國內爆發。該蠕蟲病毒執行後會自複製到系統盤Windows目錄下,並創建註冊表自啟動,刪除用戶除C盤以外的所有磁碟文件,危害極大。
  • 懂Excel輕鬆入門Python數據分析包pandas(二十一):透視表
    此系列文章收錄在公眾號中:數據大宇宙 > 數據處理 >E-pd經常聽別人說 Python 在數據領域有多厲害,結果學了很長時間,連數據處理都麻煩得要死。pandas 中添加這2列是非常簡單"Excel 透視表是百分比呀"pandas透視表功能沒有參數設置,因為本身透視出來的還是一個DataFrame,這可以利用之前學到的一切技巧來為這個DataFrame"添油加醋":默認情況下,如果把一個 DataFrame 作為文本顯示,小數是不會變百分比,這裡可以設置 pd.set_option('display.float_format
  • 火絨安全關於Incaseformat蠕蟲病毒詳細解答
    1月13日, 「Incaseformat」病毒因其破壞性以及集中爆發的特性,在全網引起了大量用戶的高度關注。火絨安全實驗室迅速對該事件跟進確認,並整理、解答用戶關心的8個重要問題。1、incaseformat病毒是什麼類型病毒?「incaseformat病毒」為蠕蟲病毒,不具備加密文件危害。