docker pull debezium/zookeeper
以 daemo 的方式運行鏡像,並且暴露 2181,2888,3888 埠docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
--rm 表示容器停止後刪除本地數據-d 表示在後臺運行容器2. 獲取一個 kafka 的鏡像docker pull debezium/kafkadocker run -d -it --rm --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.56.10:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --link zookeeper:zookeeper debezium/kafka
--link 表示可以和 zookeeper 容器互相通信3. 拉取一個 mysql 的鏡像docker pull debezium/example-mysql在後臺執行 mysql 的鏡像
docker run -d -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql
4. 單獨開一個 shell 進入 mysql 的命令行中docker run -it --rm --name mysqlterm --link mysql --rm debezium/example-mysql sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
5 拉取一個 debezium/connect 的鏡像docker pull debezium/connect啟動 kafka connect 服務
docker run -d -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect啟動之後,我們可以使用 rest api 來檢查 Kafka Connect 的服務狀態
curl -H "Accept:application/json" localhost:8083/{"version":"2.6.0","commit":"62abe01bee039651","kafka_cluster_id":"vkx8c6lhT1emLtPSi-ge6w"}
使用 rest api 來查看有多少 connect 服務註冊到 Kafka Connect 上了
curl -H "Accept:application/json" localhost:8083/connectors/現在會返回一個空數組,表示還沒有服務註冊上去。
6 註冊一個 Connector 去檢測 mysql 資料庫的變化註冊的話,需要往 Kafka Connect 的 rest api 發送一個 Post 請求,請求內容如下3 task 最大數量,應該配置成 1,因為 Mysql 的 Connector 會讀取 Mysql 的 binlog,使用單一的任務才能保證合理的順序;4 這裡配置的是 mysql,其實是一個 host,如果非 docker 環境,則要配置成 ip 地址或者可以解析的域名;5 唯一的 serverId,會被作為 Kafka Topic 的前綴;執行下面的命令發送一個 Post 請求,註冊到 Kafka Connect 上:curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
7 新開一個 shell,啟動一個 Kafka 的消費者,來查看 Debezium 發送過來的事件docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka watch-topic -a -k dbserver1.inventory.customers
8 在 mysql 的命令行窗口上,修改一條數據use inventory;
UPDATE customers SET first_name='Anne211' WHERE id=1004;
9 觀察 kafka 消費者窗口的變化發現會發送過來兩條 json,一條是更新的哪個主鍵,一條是具體的更新內容
五、Flink 集成 Debezium 同步數據下面我們使用 Flink 來消費 Debezium 產生的數據,把變更的數據都同步到另外一張表中。主要步驟有:package com.hudsun.flink.cdc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Author wangkai
* @Time 2020/12/22 23:11
*/
public class DebeziumCDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings streamSettings = EnvironmentSettings.newInstance()
.inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, streamSettings);
tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// debezium 捕獲到變化的數據會寫入到這個 topic 中
String topicName = "dbserver1.inventory.customers";
String bootStrpServers = "192.168.56.10:9092";
String groupID = "testGroup";
// 目標資料庫地址
String url = "jdbc:mysql://192.168.56.10:3306/inventory";
String userName = "root";
String password = "debezium";
String mysqlSinkTable = "customers_copy";
// 創建一個 Kafka 數據源的表
tableEnvironment.executeSql("CREATE TABLE customers (\n" +
" id int,\n" +
" first_name STRING,\n" +
" last_name STRING,\n" +
" email STRING \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '" + topicName + "',\n" +
" 'properties.bootstrap.servers' = '" + bootStrpServers + "',\n"
+
" 'debezium-json.schema-include' = 'true',\n" +
" 'properties.group.id' = '" + groupID + "',\n" +
" 'format' = 'debezium-json'\n" +
")");
// 創建一個寫入數據的 sink 表
tableEnvironment.executeSql("CREATE TABLE customers_copy (\n" +
" id int,\n" +
" first_name STRING,\n" +
" last_name STRING,\n" +
" email STRING, \n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '" + url + "',\n" +
" 'username' = '" + userName + "',\n" +
" 'password' = '" + password + "',\n" +
" 'table-name' = '" + mysqlSinkTable + "'\n" +
")");
String updateSQL = "insert into customers_copy select * from customers";
TableResult result = tableEnvironment.executeSql(updateSQL);
env.execute("sync-flink-cdc");
}
}最後的最後,推薦進我的微信群,每天都有在更新乾貨,公眾號回覆:進群,即可。