數據同步可用性還是很高的,這兩天有時間便來總結下開源數據同步方案。
個人學習流程:
適合查閱人員:後端開發,測試人員,數據分析工程師。
概念介紹:
ETL(數據倉庫技術):英文Extract-Transform-Load的縮寫,用來描述將數據從來源端經過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。
數據同步劃分
全量同步:
增量同步:
增量同步一般是做實時的同步
開源工具
datax 是阿里開源的etl 工具,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能,採用java+python進行開發,核心是java語言實現。數據交換通過DataX進行中轉,任何數據源只要和DataX連接上即可以和已實現的任意數據源同步。
Databus是一個實時的、可靠的、支持事務的、保持一致性的數據變更抓取系統。2011年在LinkedIn正式進入生產系統,2013年開源。
Gobblin是用來整合各種數據源的通用型ETL框架,在某種意義上,各種數據都可以在這裡「一站式」的解決ETL整個過程,專為大數據採集而生,易於操作和監控,提供流式抽取支持。主要用於Kafka的數據同步到HDFS。
MongoShake是阿里巴巴Nosql團隊開源出來的一個項目,主要用於mongdb的數據同步到kafka或者其他的mongdb資料庫中。集群數據同步是其中核心應用場景,通過抓取oplog後進行回放達到同步目的,實現災備和多活的業務場景。
FlinkX是一款基於Flink的分布式離線/實時數據同步插件,可實現多種異構數據源高效的數據同步,其由袋鼠雲於2016年初步研發完成,目前有穩定的研發團隊持續維護,已在Github上開源(開源地址詳見文章末尾)。並於今年6年份,完成批流統一,離線計算與流計算的數據同步任務都可基於FlinkX實現。
FlinkX是一個基於Flink的批流統一的數據同步工具,既可以採集靜態的數據,比如MySQL,HDFS等,也可以採集實時變化的數據,比如MySQL binlog,Kafka等。
阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。
使用spark任務,通過HQl讀取數據,然後再通過hbase的Api插入到hbase中。
使用BulkLoad可以快速導入,BulkLoad主要是借用了hbase的存儲設計思想,因為hbase本質是存儲在hdfs上的一個文件夾,然後底層是以一個個的Hfile存在的。如果不是hive中的數據,比如外部的數據,那麼我們可以將外部的數據生成文件,然後上傳到hdfs中,組裝RowKey,然後將封裝後的數據在回寫到HDFS上,以HFile的形式存儲到HDFS指定的目錄中。
基於sqoop的全量導入
比較總結:
datax一般比較適合於全量數據同步,對全量數據同步效率很高(任務可以拆分,並發同步,所以效率高),對於增量數據同步支持的不太好(可以依靠時間戳+定時調度來實現,但是不能做到實時,延遲較大)。
canal 、databus 等由於是通過日誌抓取的方式進行同步,所以對增量同步支持的比較好。
databus,flinkx活躍度也不是非常高,關注的人還不是很多,MongoShake專為mongdb服務不作考慮。
比較分析,同步服務選用datax,canal使用人多,國人開發,文檔源碼易讀。
本文主要介紹datax,canal兩款工具。
1.全量離線同步datax
DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件(插件化是值得總結的技術技巧),以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。
流程:
架構:
核心模塊介紹:
DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之後,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
DataXJob啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
切分多個Task之後,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。
每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
DataX作業運行起來之後, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,進程退出值非0
DataX調度流程:
舉例來說,用戶提交了一個DataX作業,並且配置了20個並發,目的是將一個100張分表的mysql數據同步到odps裡面。DataX的調度決策思路是:
DataXJob根據分庫分表切分成了100個Task。
根據20個並發,DataX計算共需要分配4個TaskGroup。
4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個並發共計運行25個Task。
優勢:
插件視角看框架
Job:是DataX用來描述從一個源頭到目的的同步作業,是DataX數據同步的最小業務單元;
Task:為最大化而把Job拆分得到最小的執行單元,進行並發執行;
TaskGroup:一組Task集合,在同一個TaskGroupContainer執行下的Task集合稱為TaskGroup;
JobContainer:Job執行器,負責Job全局拆分、調度、前置語句和後置語句等工作的工作單元。類似Yarn中的JobTracker;
TaskGroupContainer:TaskGroup執行器,負責執行一組Task的工作單元,類似Yarn中的TAskTacker。
即,Job拆分為Task,分別在框架提供的容器中執行,插件只需要實現Job和Task兩部分邏輯。
物理執行有三種運行模式:
總體來說,當JobContainer和TaskGroupContainer運行在同一個進程內的時候就是單機模式,在不同進程執行就是分布式模式。
類型
數據源
Reader(讀)
Writer(寫)
文檔
RDBMS 關係型資料庫
MySQL
√
√
讀 、寫
Oracle
√
√
讀 、寫
SQLServer
√
√
讀 、寫
PostgreSQL
√
√
讀 、寫
DRDS
√
√
讀 、寫
通用RDBMS(支持所有關係型資料庫)
√
√
讀 、寫
阿里雲數倉數據存儲
ODPS
√
√
讀 、寫
ADS
√
寫
OSS
√
√
讀 、寫
OCS
√
√
讀 、寫
NoSQL數據存儲
OTS
√
√
讀 、寫
Hbase0.94
√
√
讀 、寫
Hbase1.1
√
√
讀 、寫
Phoenix4.x
√
√
讀 、寫
Phoenix5.x
√
√
讀 、寫
MongoDB
√
√
讀 、寫
Hive
√
√
讀 、寫
無結構化數據存儲
TxtFile
√
√
讀 、寫
FTP
√
√
讀 、寫
HDFS
√
√
讀 、寫
Elasticsearch
√
寫
時間序列資料庫
OpenTSDB
√
讀
TSDB
√
寫
插件開發指南:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
使用:
環境要求:
(1)、下載DataX源碼: git clone git@github.com:alibaba/DataX.git
(2)、通過maven打包: mvn -U clean package assembly:assembly -Dmaven.test.skip=true
結構如下:bin conf job lib log log_perf plugin script tmp注意連接資料庫的jar(可替換成自己對應的資料庫版本)
可視化界面:
(1)、下載datax-web源碼git clone https://github.com/WeiYe-Jing/datax-web.git
(2)、創建資料庫執行bin/db下面的datax_web.sql文件
(3)、修改配置1.修改datax_admin下resources/application.yml文件2.修改datax_executor下resources/application.yml文件啟動項目:
1.本地idea開發環境
admin啟動成功後日誌會輸出三個地址,兩個接口文檔地址,一個前端頁面地址
啟動成功後打開頁面(默認管理員用戶名:admin 密碼:123456)
http://localhost:8080/index.html#/dashboard
2.配置執行
參數配置說明:
同步規則Json
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [//需要同步的欄位 "district_code", "district_name" ], "connection": [//源資料庫連接 { "jdbcUrl": [ "jdbc:mysql://localhost:3306/cci?characterEncoding=utf8&allowPublicKeyRetrieval=true&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC" ], "table": [//源表 "std_com_code_district" ] } ], "password": "123456", "username": "root", "where": "" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [//目標欄位 "district_code", "district_name" ], "connection": [//目標資料庫連接 { "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/cci_canal?useUnicode=true&characterEncoding=utf8&useSSL=false", "table": [ "syn_table_test" ] } ], "password": "123456", "preSql": [], "session": [], "username": "root", "writeMode": "insert" //寫入模式 } } } ], "setting": { "speed": { "channel": "1" } } }}效果:
每天定時同步數據
源表
目標表:
總結:
Datax適合離線全量同步,對於增量同步也有偽方案:
MysqlReader使用JDBC SELECT語句完成數據抽取工作,因此可以使用SELECT…WHERE…進行增量數據抽取,方式有多種:
對於業務上無欄位區分新增、修改數據情況,MysqlReader也無法進行增量數據同步,只能同步全量數據。
以上是datax數據同步,以後數據同步就不需要自己寫定時任務了,簡單的配一配,複雜的在這基礎上再開發。完美!
2.增量實時同步canal
開源的組件,已經實現模擬成一個mysql的slave,拉取binlog的服務:
阿里巴巴開源的canal
美團開源的puma
linkedin開源的databus
增量同步服務要考慮的問題:
如何解決重複插入
如何解決唯一索引衝突
對於DDL語句如何處理
如何解決數據迴環問題
canal/otter基於資料庫的日誌解析,獲取增量變更進行同步,可以實現增量訂閱&消費的業務。canal的最新版本已經實現了GTID,數據重複插入,數據迴環。可以進行DDL語句過濾。
應用場景:
資料庫鏡像
資料庫實時備份
多級索引 (賣家和買家各自分庫索引)
search build
業務cache刷新
價格變化等重要業務消息
原理:
canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log對象(原始為byte流)
Canal內部組件解析
Canal節點,可以有多個instances,每個instance在運行時為一個單獨的Spring Context,對象實例為「CanalInstanceWithSpring」。每個instances有一個單獨的線程處理整個數據流過程。instance內部有EventParser、EventSink、EventStore、metaManager主要四個組件構成,當然還有其他的守護組件比如monitor、HA心跳檢測、ZK事件監聽等。對象實例初始化和依賴關係,可以參見「default-instance.xml」,其配置模式為普通的Spring。(源碼參見:SpringCanalInstanceGenerator)
canal的ha分為兩部分,canal server和canal client分別有對應的ha實現canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.
canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序。整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定)
Parser主要用於解析指定"資料庫"的binlog,內部基於JAVA實現的「binlog dump」、「show master status」等。Parser會與ZK交互,並獲取當前instance所有消費者的cursor,並獲其最小值,作為此instance解析binlog的起始position。目前的實現,一個instance同時只能有一個consumer處於active消費狀態,ClientId為定值「1001」,「cursor」中包含consumer消費binlog的position,數字類型。有次可見,Canal instance本身並沒有保存binlog的position,Parser中繼操作是根據consumer的消費cursor位置來決定;對於信息缺失時,比如Canal集群初次online,且在「default-instance.xml」中也沒有指定「masterPositiion」信息(每個instance.properties是可以指定起始position的),那麼將根據「show master status」指令獲取當前binlog的最後位置。(源碼:MysqlEventParser.findStartPosition())
Parser每次、批量獲取一定條數的binlog,將binlog數據封裝成event,並經由EventSink將消息轉發給EventStore,Sink的作用就是「協調Parser和Store」,確保binglog的解析速率與Store隊列容量相容。
EventStore,用於暫存「尚未消費」的events的存儲隊列,默認基於內存的阻塞隊列實現。Store中的數據由Sink組件提交入隊,有NettyServer服務的消費者消費確認後出隊,隊列的容量和容量模式由「canal.properties」中的「memory」相關配置決定。當Store中容量溢滿時,將會阻塞Sink操作(間接阻塞Parser),所以消費者的效能會直接影響instance的同步效率。借鑑了Disruptor的RingBuffer的實現思路。metaManager:主要用於保存Parser組件、CanalServer(即本文中提到的NettyServer)、Canal Instances的meta數據,其中Parser組件涉及到的binlog position、CanalServer與消費者交互時ACK的Cursor信息、instance的集群運行時信息等。根據官方解釋,我們在production級別、高可靠業務要求場景下,metaManager建議基於Zookeeper實現。其中有關Position信息由CanalLogPositionManager類負責,其實現類有多個,在Cluster模式下,建議基於FailbackLogPositionManager,其內部有「primary」、「failback」兩級組合,優先基於primary來存取Position,只有當primary異常時會「降級」使用failback;其配置模式,建議與「default-instance.xml」保持一致。
部署使用:
canal的作用就是類似於前面所述的binlog syncer,拉取解析binlog。otter是canal的客戶端,專門用於進行數據同步,類似於前文所講解的sql writer。
安裝步驟:
canal+kafka+mysql+canal-admin聯合部署
docker run --name localdb_mysql -v D:\docker_data\mysql\my.cnf:/etc/mysql/conf.d -v D:\docker_data\mysql\logs:/logs -v D:\docker_data\mysql\data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 -d -i -p 3306:3306 mysql:5.7.18
mysql資料庫配置(docker安裝可以使用echo寫入)cd /etc/mysql/mysql.conf.decho -e '[mysqld]\npid-file\t= /var/run/mysqld/mysqld.pid\nsocket\t= /var/run/mysqld/mysqld.sock\ndatadir\t= /var/lib/mysql\nsymbolic-links=0\nserver-id = 1\nlog-bin = binlog\nlog-bin-index = binlog.index'>mysqld.cnf#添加這一行就ok log-bin=mysql-bin#選擇row模式binlog-format=ROW#配置mysql replaction需要定義,不能和canal的slaveId重複server_id=1
創建同步帳號CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;
//單機測試暫時不用安裝zk和kafka安裝zookeeperdocker run --privileged=true -d --name zookeeper --publish 2181:2181 -d zookeeper:latest
安裝zookeeper客戶端 =》zkui 或者 prettyZoo
安裝kafkadocker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
安裝kafka管理界面docker run -d --name kafka-manager \--link zookeeper:zookeeper \--link kafka:kafka -p 9001:9000 \--restart=always \--env ZK_HOSTS=zookeeper:2181 \sheepkiller/kafka-manager
啟動canal、canal-admin、canal-adaptergit clone https://github.com/alibaba/canal.git
測試kafka安裝成功://創建topicbin/kafka-topics.sh --create --zookeeper zookeeper :2181 --replication-factor 1 --partitions 1 --topic mykafka//查看topicbin/kafka-topics.sh --list --zookeeper zookeeper :2181//創建生產者bin/kafka-console-producer.sh --broker-list zookeeper :9092 --topic mykafka //創建消費者bin/kafka-console-consumer.sh --zookeeper zookeeper :2181 --topic mykafka --from-beginning配置:
canal配置:
##canal.destinations= example ##canal.conf.dir = ../conf # 是否開啟「instance」配置修改自動掃描和重載 #####canal.auto.scan = true canal.auto.scan.interval = 5 #canal.instance.global.mode = spring #canal.instance.global.lazy = false ###canal.instance.global.spring.xml = classpath:spring/default-instance.xml ##canal.id = 1 ##此IP主要為canalServer提供TCP服務而使用,將會被註冊到ZK中,Consumer將與此IP建立連接。canal.ip = #canal.port = 11111 ##canal.zkServers = 10.0.1.21:2818,10.0.1.22,10.0.2.21:2818/canal/g1 # flush data to zk #####canal.zookeeper.flush.period = 1000 ##canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ########canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.buffer.size = 16384 canal.instance.memory.buffer.memunit = 1024 # 所能支撐的事務的最大長度,超過閾值之後,一個事務的消息將會被拆分,並多次提交到eventStore中,但是將無法保證事務的完整性 canal.instance.transaction.size = 1024 # 當instance.properties配置文件中指定「master」、「standby」時,當canal與「master」聯通性故障時,觸發連接源的切換, ##canal.instance.fallbackIntervalInSeconds = 60 ##canal.instance.detecting.enable = true #如果你需要限定某個database的可用性驗證(比如庫鎖), #最好使用複雜的、有效的SQL,比如:insert into {database}.{tmpTable} .... canal.instance.detecting.sql = select 1 #canal.instance.detecting.interval.time = 6 ###canal.instance.detecting.retry.threshold = 5 #如果在instance.properties配置了「master」、「standby」,且此參數開啟時,在「探測失敗」後,會選擇備庫進行binlog獲取 #建議關閉 canal.instance.detecting.heartbeatHaEnable = false # CanalServer、instance有關的TCP網絡配置,建議保持抱人 canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # Parser組件,有關binlog解析的過濾 #canal.instance.filter.query.dcl = false #canal.instance.filter.query.dml = false #canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false # binlog格式和「鏡像」格式檢測,建議保持默認 canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # ddl是否隔離發送,保持默認 canal.instance.get.ddl.isolation = false
1.instance列表定義 (列出當前server上有多少個instance,每個instance的加載方式)
參數名字
參數說明
默認值
canal.destinations
當前server上部署的instance列表
無
canal.conf.dir
conf/目錄所在的路徑
../conf
canal.auto.scan
開啟instance自動掃描
如果配置為true,canal.conf.dir目錄下的instance配置變化會自動觸發:
a. instance目錄新增:觸發instance配置載入,lazy為true時則自動啟動
b. instance目錄刪除:卸載對應instance配置,如已啟動則進行關閉
c. instance.properties文件變化:reload instance配置,如已啟動自動進行重啟操作
true
canal.auto.scan.interval
instance自動掃描的間隔時間,單位秒
5
canal.instance.global.mode
全局配置加載方式
spring
canal.instance.global.lazy
全局lazy模式
false
canal.instance.global.manager.address
全局的manager配置方式的連結信息
無
canal.instance.global.spring.xml
全局的spring配置方式的組件文件
classpath:spring/file-instance.xml
(spring目錄相對於canal.conf.dir)
canal.instance.example.mode
canal.instance.example.lazy
canal.instance.example.spring.xml
…..
instance級別的配置定義,如有配置,會自動覆蓋全局配置定義模式
命名規則:canal.instance.{name}.xxx
無
canal.instance.tsdb.spring.xml
v1.0.25版本新增,全局的tsdb配置方式的組件文件
classpath:spring/tsdb/h2-tsdb.xml
(spring目錄相對於canal.conf.dir)
2.common參數定義,比如可以將instance.properties的公用參數,抽取放置到這裡,這樣每個instance啟動的時候就可以共享.
參數名字
參數說明
默認值
canal.id
每個canal server實例的唯一標識,暫無實際意義
1
canal.ip
canal server綁定的本地IP信息,如果不配置,默認選擇一個本機IP進行啟動服務
無
canal.port
canal server提供socket服務的埠
11111
canal.zkServers
canal server連結zookeeper集群的連結信息
例子:127.0.0.1:2181,127.0.0.1:2182
無
canal.zookeeper.flush.period
canal持久化數據到zookeeper上的更新頻率,單位毫秒
1000
canal.file.data.dir
canal持久化數據到file上的目錄
../conf (默認和instance.properties為同一目錄,方便運維和備份)
canal.file.flush.period
canal持久化數據到file上的更新頻率,單位毫秒
1000
canal.instance.memory.batch.mode
canal內存store中數據緩存模式
1. ITEMSIZE : 根據buffer.size進行限制,只限制記錄的數量
2. MEMSIZE : 根據buffer.size * buffer.memunit的大小,限制緩存記錄的大小
MEMSIZE
canal.instance.memory.buffer.size
canal內存store中可緩存buffer記錄數,需要為2的指數
16384
canal.instance.memory.buffer.memunit
內存記錄的單位大小,默認1KB,和buffer.size組合決定最終的內存使用大小
1024
canal.instance.transactionn.size
最大事務完整解析的長度支持
超過該長度後,一個事務可能會被拆分成多次提交到canal store中,無法保證事務的完整可見性
1024
canal.instance.fallbackIntervalInSeconds
canal發生mysql切換時,在新的mysql庫上查找binlog時需要往前查找的時間,單位秒
說明:mysql主備庫可能存在解析延遲或者時鐘不統一,需要回退一段時間,保證數據不丟
60
canal.instance.detecting.enable
是否開啟心跳檢查
false
canal.instance.detecting.sql
心跳檢查sql
insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.interval.time
心跳檢查頻率,單位秒
3
canal.instance.detecting.retry.threshold
心跳檢查失敗重試次數
3
canal.instance.detecting.heartbeatHaEnable
心跳檢查失敗後,是否開啟自動mysql自動切換
說明:比如心跳檢查失敗超過閥值後,如果該配置為true,canal就會自動鏈到mysql備庫獲取binlog數據
false
canal.instance.network.receiveBufferSize
網絡連結參數,SocketOptions.SO_RCVBUF
16384
canal.instance.network.sendBufferSize
網絡連結參數,SocketOptions.SO_SNDBUF
16384
canal.instance.network.soTimeout
網絡連結參數,SocketOptions.SO_TIMEOUT
30
canal.instance.filter.query.dcl
是否忽略DCL的query語句,比如grant/create user等
false
canal.instance.filter.query.dml
是否忽略DML的query語句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query記錄)
false
canal.instance.filter.query.ddl
是否忽略DDL的query語句,比如create table/alater table/drop table/rename table/create index/drop index. (目前支持的ddl類型主要為table級別的操作,create databases/trigger/procedure暫時劃分為dcl類型)
false
canal.instance.filter.druid.ddl
v1.0.25版本新增,是否啟用druid的DDL parse的過濾,基於sql的完整parser可以解決之前基於正則匹配補全的問題,默認為true
true
canal.instance.get.ddl.isolation
ddl語句是否隔離發送,開啟隔離可保證每次只返回發送一條ddl數據,不和其他dml語句混合返回.(otter ddl同步使用)
false
conf/example/instance.properties配置文件:
在canal.properties定義了canal.destinations後,需要在canal.conf.dir對應的目錄下建立同名的文件。
如果canal.properties未定義instance列表,但開啟了canal.auto.scan時
1. 發現目錄有新增,啟動新的instance
2. 發現目錄有刪除,關閉老的instance
3. 發現對應目錄的instance.properties有變化,重啟instance
instance.properties參數列表:
####canal.instance.mysql.slaveId = 11110001 # 資料庫相關:master庫 ####canal.instance.master.address = 127.0.0.1:3306 #canal.instance.master.journal.name = #canal.instance.master.position = #canal.instance.master.timestamp = ###canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # 資料庫連接的用戶名和密碼 # 貌似Consumer與CanalServer建立連接時也用的是此用戶名和密碼 canal.instance.dbUsername = canal canal.instance.dbPassword = canal # 默認資料庫 canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 # schema過濾規則,類似於MySQL binlog的filter # canal將會過濾那些不符合要求的table,這些table的數據將不會被解析和傳送 # filter格式,Consumer端可以指定,只不過是後置的。#canal.instance.filter.regex = .*\\..* # table black regex canal.instance.filter.black.regex =
參數名字
參數說明
默認值
canal.instance.mysql.slaveId
mysql集群配置中的serverId概念,需要保證和當前mysql集群中id唯一
1234
canal.instance.master.address
mysql主庫連結地址
127.0.0.1:3306
canal.instance.master.journal.name
mysql主庫連結時起始的binlog文件
無
canal.instance.master.position
mysql主庫連結時起始的binlog偏移量
無
canal.instance.master.timestamp
mysql主庫連結時起始的binlog的時間戳
無
canal.instance.dbUsername
mysql資料庫帳號
canal
canal.instance.dbPassword
mysql資料庫密碼
canal
canal.instance.defaultDatabaseName
mysql連結時默認schema
canal.instance.connectionCharset
mysql 數據解析編碼
UTF-8
canal.instance.filter.regex
mysql 數據解析關注的表,Perl正則表達式.
多個正則之間以逗號(,)分隔,轉義符需要雙斜槓(\\)
常見例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表:canal\\..*
3. canal下的以canal打頭的表:canal\\.canal.*
4. canal schema下的一張表:canal.test1
5. 多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
注意:此過濾條件只針對row模式的數據有效(ps. mixed/statement因為不解析sql,所以無法準確提取tableName進行過濾)
.*\\..*
canal.instance.tsdb.enable
v1.0.25版本新增,是否開啟table meta的時間序列版本記錄功能
true
canal.instance.tsdb.dir
v1.0.25版本新增,table meta的時間序列版本的本地存儲路徑,默認為instance目錄
{canal.instance.destination:}
canal.instance.tsdb.url
v1.0.25版本新增,table meta的時間序列版本存儲的資料庫連結串,比如例子為本地嵌入式資料庫
jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername
v1.0.25版本新增,table meta的時間序列版本存儲的資料庫連結帳號
canal
canal.instance.tsdb.dbUsername
v1.0.25版本新增,table meta的時間序列版本存儲的資料庫連結密碼
canal
說明:
mysql連結時的起始位置
canalinstance.master.journal.name + canal.instance.master.position : 精確指定一個binlog位點,進行啟動
canal.instance.master.timestamp : 指定一個時間戳,canal會自動遍歷mysql binlog,找到對應時間戳的binlog位點後,進行啟動
不指定任何信息:默認從當前資料庫的位點,進行啟動。(show master status)
測試簡單配置:
#position info,需要改成自己的資料庫信息canal.instance.master.address = 127.0.0.1:3306#username/password,需要改成自己的資料庫信息canal.instance.dbUsername = canalcanal.instance.dbPassword = canal適配器配置:
dataSourceKey: defaultDSdestination: examplegroupId: g1outerAdapterKey: mysconcurrent: truedbMapping: #源資料庫 database: source #源表 table: source_test #目標表 targetTable: target_test targetPk: #源表和目標表的主鍵對應關係 id: id# mapAll: true targetColumns: name: name目標:
對源表進行增刪改操作同步到目標表
啟動canal
啟動rdb適配器=》同步mysql
canal-admin進行配置和實例管理
對源表插入數據
適配器日誌:
2021-02-23 13:24:53.987 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":3,"name":"user3"},"database":"source","destination":"example","old":null,"table":"source_test","type":"INSERT"}目標表:
項目中訂閱:
初步看,如果要實現異地雙活,實時同步這樣的需求,這兩款組件貌似都沒滿足要求,但也不能完全無視其其他用途,比如canal配置zk和使用kafka,可以實現一對多訂閱,實時同步mysql數據,刷redis緩存,elasticsearch搜索庫等常用組件,效果上還是挺實用的。
下一步整理下源碼上寫的有哪些可取的亮點特色。