Flink + Debezium CDC 實現原理及代碼實戰

2021-12-29 KK架構
一、Debezium 介紹Debezium 是一個分布式平臺,它將現有的資料庫轉換為事件流,應用程式消費事件流,就可以知道資料庫中的每一個行級更改,並立即做出響應。Debezium 構建在 Apache Kafka 之上,並提供 Kafka 連接器來監視特定的資料庫。在介紹 Debezium 之前,我們要先了解一下什麼是 Kafka Connect。二、Kafka Connect 介紹Kafka 相信大家都很熟悉,是一款分布式,高性能的消息隊列框架。一般情況下,讀寫 Kafka 數據,都是用 Consumer 和 Producer  Api 來完成,但是自己實現這些需要去考慮很多額外的東西,比如管理 Schema,容錯,並行化,數據延遲,監控等等問題。而在 0.9.0.0 版本之後,官方推出了 Kafka Connect ,大大減少了程式設計師的工作量,它有下面的特性:REST 接口,用來查看和管理Kafka connectors;自動化的offset管理,開發人員不必擔心錯誤處理的影響;Kafka Connect 有兩個核心的概念:Source 和 Sink,Source 負責導入數據到 Kafka,Sink 負責從 Kafka 導出數據,它們都被稱為是 Connector。如下圖,左邊的 Source 負責從源數據(RDBMS,File等)讀數據到 Kafka,右邊的 Sinks 負責從 Kafka 消費到其他系統。三、Debezium 架構和實現原理Debezium 有三種方式可以實現變化數據的捕獲以插件的形式,部署在 Kafka Connect 上在上圖中,中間的部分是 Kafka Broker,而 Kafka Connect 是單獨的服務,需要下載  debezium-connector-mysql  連接器,解壓到伺服器指定的地方,然後在 connect-distribute.properties 中指定連接器的根路徑,即可使用。這種模式中,需要配置不同的連接器,從源頭處捕獲數據的變化,序列化成指定的格式,發送到指定的系統中。內嵌模式,既不依賴 Kafka,也不依賴 Debezium Server,用戶可以在自己的應用程式中,依賴 Debezium 的 api 自行處理獲取到的數據,並同步到其他源上。四、使用 Docker 來安裝 Debezium Kafka Mysql這裡我們使用官網提供的 Docker 方式快速的搭建一個演示環境。Docker 的安裝和基本命令,可以參考我之前的文章或者在網上找相關的教程。1. 首先獲取一個 zk 的鏡像
docker pull debezium/zookeeper

以 daemo 的方式運行鏡像,並且暴露 2181,2888,3888 埠
docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper

--rm 表示容器停止後刪除本地數據-d 表示在後臺運行容器2. 獲取一個 kafka 的鏡像
docker pull debezium/kafka

docker run -d -it --rm --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.56.10:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --link zookeeper:zookeeper debezium/kafka

--link 表示可以和 zookeeper 容器互相通信3. 拉取一個 mysql 的鏡像
docker pull debezium/example-mysql

在後臺執行 mysql 的鏡像

docker run -d -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql

4. 單獨開一個 shell 進入 mysql 的命令行中
docker run -it --rm --name mysqlterm --link mysql --rm debezium/example-mysql sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

5 拉取一個 debezium/connect 的鏡像
docker pull debezium/connect

啟動 kafka connect 服務

docker run -d -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect

啟動之後,我們可以使用 rest api 來檢查 Kafka Connect 的服務狀態

curl -H "Accept:application/json" localhost:8083/

{"version":"2.6.0","commit":"62abe01bee039651","kafka_cluster_id":"vkx8c6lhT1emLtPSi-ge6w"}

使用 rest api 來查看有多少 connect 服務註冊到 Kafka Connect 上了

curl -H "Accept:application/json" localhost:8083/connectors/

現在會返回一個空數組,表示還沒有服務註冊上去。

6 註冊一個 Connector 去檢測 mysql 資料庫的變化註冊的話,需要往 Kafka Connect 的 rest api 發送一個 Post 請求,請求內容如下3 task 最大數量,應該配置成 1,因為 Mysql 的 Connector 會讀取 Mysql 的 binlog,使用單一的任務才能保證合理的順序;4 這裡配置的是 mysql,其實是一個 host,如果非 docker 環境,則要配置成 ip 地址或者可以解析的域名;5 唯一的 serverId,會被作為 Kafka Topic 的前綴;執行下面的命令發送一個 Post 請求,註冊到 Kafka Connect 上:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

7 新開一個 shell,啟動一個 Kafka 的消費者,來查看 Debezium 發送過來的事件
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka watch-topic -a -k dbserver1.inventory.customers

8 在 mysql 的命令行窗口上,修改一條數據
use inventory;
UPDATE customers SET first_name='Anne211' WHERE id=1004;

9 觀察 kafka 消費者窗口的變化

發現會發送過來兩條 json,一條是更新的哪個主鍵,一條是具體的更新內容

五、Flink 集成 Debezium 同步數據下面我們使用 Flink 來消費 Debezium 產生的數據,把變更的數據都同步到另外一張表中。主要步驟有:
package com.hudsun.flink.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author wangkai
 * @Time 2020/12/22 23:11
 */
public class DebeziumCDC {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings streamSettings = EnvironmentSettings.newInstance()
                .inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment =
                StreamTableEnvironment.create(env, streamSettings);
        tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        // debezium 捕獲到變化的數據會寫入到這個 topic 中
        String topicName = "dbserver1.inventory.customers";
        String bootStrpServers = "192.168.56.10:9092";
        String groupID = "testGroup";

        // 目標資料庫地址
        String url = "jdbc:mysql://192.168.56.10:3306/inventory";
        String userName = "root";
        String password = "debezium";
        String mysqlSinkTable = "customers_copy";


        // 創建一個 Kafka 數據源的表
        tableEnvironment.executeSql("CREATE TABLE customers (\n" +
                " id int,\n" +
                " first_name STRING,\n" +
                " last_name STRING,\n" +
                " email STRING \n" +
                ") WITH (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = '" + topicName + "',\n" +
                " 'properties.bootstrap.servers' = '" + bootStrpServers + "',\n"
                +
                " 'debezium-json.schema-include' = 'true',\n" +
                " 'properties.group.id' = '" + groupID + "',\n" +
                " 'format' = 'debezium-json'\n" +
                ")");

        // 創建一個寫入數據的 sink 表
        tableEnvironment.executeSql("CREATE TABLE customers_copy (\n" +
                " id int,\n" +
                " first_name STRING,\n" +
                " last_name STRING,\n" +
                " email STRING, \n" +
                " PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'jdbc',\n" +
                " 'url' = '" + url + "',\n" +
                " 'username' = '" + userName + "',\n" +
                " 'password' = '" + password + "',\n" +
                " 'table-name' = '" + mysqlSinkTable + "'\n" +
                ")");
        String updateSQL = "insert into customers_copy select * from customers";
        TableResult result = tableEnvironment.executeSql(updateSQL);

        env.execute("sync-flink-cdc");
    }
}

最後的最後,推薦進我的微信群,每天都有在更新乾貨,公眾號回覆:進群,即可。

相關焦點

  • Flink CDC 系列 - 實時抽取 Oracle 數據,排雷和調優實踐
    說明:本文力求根據實際的問題排查經驗,以及內部執行原理分享一些 「乾貨」,所以對 Flink CDC,以及其內置的 Debezium 模塊的基礎使用方法並未涉及,對於基礎的使用方法、參數等,讀者可參考以下地址:Flink CDChttps://ververica.github.io/flink-cdc-connectors
  • 兩隻松鼠的故事:flink-connector-opengauss & CDC實戰
    答案是肯定的,接下來我們就來實現一個簡單的opengauss的Flink connector註冊動態表工廠(DynamicTableFactory),以及相關Sink程序經過上面三步,就可以實現一個簡單的connector了。
  • Flink CDC 2.0 數據處理流程全面解析
    本文先以Flink SQL 案例來介紹Flink CDC2.0的使用,接著介紹CDC中的核心設計包含切片劃分、切分讀取、增量讀取,最後對數據處理過程中涉及flink-mysql-cdc 接口的調用及實現進行代碼講解。
  • Flink 源碼|自定義 Format 消費 Maxwell CDC 數據
    在很早之前是通過觸發器來完成記錄,現在通過 binlog+同步中間件來實現。常用的 binlog 同步中間件有很多,比如 Alibaba 開源的 canal[1],Red Hat 開源的debezium[2],Zendesk 開源的 Maxwell[3] 等等。
  • flink sql 知其所以然(一)| source\sink 原理
    感謝您的關注  +  點讚 + 再看,對博主的肯定,會督促博主持續的輸出更多的優質實戰內容!!!1.序篇-本文結構本文從以下五個小節介紹 flink sql source\sink\format 的概念、原理。
  • 乾貨 | Debezium實現Mysql到Elasticsearch高效實時同步
    logstash和kafka_connector都僅支持基於自增id或者時間戳更新的方式增量同步數據。回到問題本身:如果庫表裡沒有相關欄位,該如何處理呢?本文給出相關探討和解決方案。1、 binlog認知1.1 啥是 binlog?
  • Apache Beam實戰指南 | 手把手教你玩轉KafkaIO與Flink
    AI 前線導讀: 本文是 Apache Beam 實戰指南系列文章 第二篇,將重點介紹 Apache Beam 與 Flink 的關係,對 Beam 框架中的 KafkaIO 和 Flink 源碼進行剖析,並結合應用示例和代碼解讀帶你進一步了解如何結合 Beam 玩轉 Kafka 和 Flink。
  • Flink Join類型介紹
    今天主要介紹一下Flink目前包含了哪些Join以及具體使用方式,後面的文章會對Join的實現原理進行剖析。
  • 【Flink】小白級入門,Flink sql 的基礎用法
    ,甚至替換成另一個引擎,都可以做到兼容地,平滑地升級,無需更改我們的已經編寫好的 sql 代碼流批統一的基礎對於 flink 通過 sql 的表達式,來完成流批的統一,一套 sql 代碼,既可以跑流任務,也可以跑批任務,減少我們開發的成本Flink sql 使用數據類型-- 字符串類型
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    目前採取的措施是在connect方法後的flatmap的實現的在open 方法中,提前加載一次配置信息,感覺這種實現方式不友好,請問還有其他的實現方式嗎?24、Flink能通過oozie或者azkaban提交嗎?25、不採用yarm部署flink,還有其他的方案嗎? 主要想解決伺服器重啟後,flink服務怎麼自動拉起?
  • Flink SQL 實戰:HBase 的結合應用
    HBase 作為 Google 發表 Big Table 論文的開源實現版本,是一種分布式列式存儲的資料庫,構建在 HDFS 之上的 NoSQL 資料庫,非常適合大規模實時查詢,因此 HBase 在實時計算領域使用非常廣泛。可以實時寫 HBase,也可以利用 buckload 一把把離線 Job 生成 HFile Load 到HBase 表中。
  • 每天5分鐘Flink -WordCount及Flink SQL
    今天咱們也按照這個入門的 Demo,把 Flink 相關代碼捋順。包括 Streaming、Batch 以及 Flink Sql 三方面分別來實現。Streaming WordCount先來分析一個 Streaming WordCount。為了模仿流式計算,咱們在本地利用 netcat命令 nc -l {port}來進行模仿數據產出。
  • 盤點Flink實戰踩過的坑
    :332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive
  • Flink 是如何將你寫的代碼生成 StreamGraph 的 (上篇)
    為什麼要「大致」閱讀,因為這些牛逼的框架都是層層封裝,搞懂核心原理已經是很不易,更別談熟讀源碼了。之前幾篇源碼閱讀的文章,不知道大家有沒有親自動手打開 Idea 去試一試,這裡我再貼一下文章連結,大家可以再回顧一下。本次,我們來聊一聊,我們自己寫的代碼是如何變成 StreamGraph 的。二、引出問題開始之前,不妨稍微回顧一下.
  • FlinkSQL窗口函數篇:易理解與實戰案例
    目的是從最容易理解與近實戰案例方式去讓讀者獲得收益。本篇是FlinkSQL實戰的開篇,歡迎收藏,轉發與持續關注。_1',          |  'properties.group.id'='flink_sql_group_1',          |  'properties.bootstrap.servers' = 'xxxx',          |  'format' = 'json'          |)          |""".stripMargin
  • 原理+代碼|Python基於主成分分析的客戶信貸評級實戰
    >2 - Apriori算法實現智能推薦3 - 隨機森林預測寬帶客戶離網4 - 多元線性回歸模型實戰5 - PCA實現客戶信貸5C評級能夠理解 PCA 的基本原理並將代碼用於實際的業務案例是本文的目標,本文將詳細介紹如何利用Python實現基於主成分分析的5c信用評級,主要分為兩個部分:詳細原理介紹Python代碼實戰
  • Flink寫入hive測試
    >      <groupId>org.apache.flink</groupId>      <artifactId>flink-core</artifactId>      <version>${flink.version}</version>      <!
  • 實戰|Kafka + Flink + Redis 的電商大屏實時計算案
    article/1558372本篇涉及到主要技術為Kafka + Flink + Redis,其中,Kafka相關的文章師長之前發過不少,對Kafka不太熟悉的可以先了解下:大白話+13張圖解 KafkaKafka 基本原理
  • 開篇|揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?
    Flink 的 Table 模塊 包括 Table API 和 SQL,Table API 是一種類SQL的API,通過Table API,用戶可以像操作表一樣操作數據,非常直觀和方便;SQL作為一種聲明式語言,有著標準的語法和規範,用戶可以不用關心底層實現即可進行數據的處理,非常易於上手,Flink Table API 和 SQL 的實現上有80%左右的代碼是公用的。
  • Structured Streaming與Flink比較
    細分:StreamGraph: 是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。JobGraph: StreamGraph經過優化後生成了JobGraph,提交給 JobManager 的數據結構。