數據同步中間件服務

2021-02-25 土豆仙

   數據同步可用性還是很高的,這兩天有時間便來總結下開源數據同步方案。

   個人學習流程:

   適合查閱人員:後端開發,測試人員,數據分析工程師。


概念介紹:

ETL(數據倉庫技術):英文Extract-Transform-Load的縮寫,用來描述將數據從來源端經過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。


數據同步劃分

全量同步:

增量同步:

增量同步一般是做實時的同步

開源工具

    datax 是阿里開源的etl 工具,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能,採用java+python進行開發,核心是java語言實現。數據交換通過DataX進行中轉,任何數據源只要和DataX連接上即可以和已實現的任意數據源同步。

    Databus是一個實時的、可靠的、支持事務的、保持一致性的數據變更抓取系統。2011年在LinkedIn正式進入生產系統,2013年開源。

  Gobblin是用來整合各種數據源的通用型ETL框架,在某種意義上,各種數據都可以在這裡「一站式」的解決ETL整個過程,專為大數據採集而生,易於操作和監控,提供流式抽取支持。主要用於Kafka的數據同步到HDFS。

 MongoShake是阿里巴巴Nosql團隊開源出來的一個項目,主要用於mongdb的數據同步到kafka或者其他的mongdb資料庫中。集群數據同步是其中核心應用場景,通過抓取oplog後進行回放達到同步目的,實現災備和多活的業務場景。

  FlinkX是一款基於Flink的分布式離線/實時數據同步插件,可實現多種異構數據源高效的數據同步,其由袋鼠雲於2016年初步研發完成,目前有穩定的研發團隊持續維護,已在Github上開源(開源地址詳見文章末尾)。並於今年6年份,完成批流統一,離線計算與流計算的數據同步任務都可基於FlinkX實現。

  FlinkX是一個基於Flink的批流統一的數據同步工具,既可以採集靜態的數據,比如MySQL,HDFS等,也可以採集實時變化的數據,比如MySQL binlog,Kafka等。

  阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。

使用spark任務,通過HQl讀取數據,然後再通過hbase的Api插入到hbase中。

使用BulkLoad可以快速導入,BulkLoad主要是借用了hbase的存儲設計思想,因為hbase本質是存儲在hdfs上的一個文件夾,然後底層是以一個個的Hfile存在的。如果不是hive中的數據,比如外部的數據,那麼我們可以將外部的數據生成文件,然後上傳到hdfs中,組裝RowKey,然後將封裝後的數據在回寫到HDFS上,以HFile的形式存儲到HDFS指定的目錄中。

基於sqoop的全量導入

比較總結:

  datax一般比較適合於全量數據同步,對全量數據同步效率很高(任務可以拆分,並發同步,所以效率高),對於增量數據同步支持的不太好(可以依靠時間戳+定時調度來實現,但是不能做到實時,延遲較大)。

    canal 、databus 等由於是通過日誌抓取的方式進行同步,所以對增量同步支持的比較好。

databus,flinkx活躍度也不是非常高,關注的人還不是很多,MongoShake專為mongdb服務不作考慮。

    比較分析,同步服務選用datax,canal使用人多,國人開發,文檔源碼易讀。

本文主要介紹datax,canal兩款工具。

1.全量離線同步datax

   DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件(插件化是值得總結的技術技巧),以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。

流程:

架構:

核心模塊介紹:

DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之後,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。

DataXJob啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。

切分多個Task之後,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。

每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。

DataX作業運行起來之後, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,進程退出值非0

DataX調度流程:

  舉例來說,用戶提交了一個DataX作業,並且配置了20個並發,目的是將一個100張分表的mysql數據同步到odps裡面。DataX的調度決策思路是:

DataXJob根據分庫分表切分成了100個Task。

根據20個並發,DataX計算共需要分配4個TaskGroup。

4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個並發共計運行25個Task。

優勢:

插件視角看框架

Job:是DataX用來描述從一個源頭到目的的同步作業,是DataX數據同步的最小業務單元;

Task:為最大化而把Job拆分得到最小的執行單元,進行並發執行;

TaskGroup:一組Task集合,在同一個TaskGroupContainer執行下的Task集合稱為TaskGroup;

JobContainer:Job執行器,負責Job全局拆分、調度、前置語句和後置語句等工作的工作單元。類似Yarn中的JobTracker;

TaskGroupContainer:TaskGroup執行器,負責執行一組Task的工作單元,類似Yarn中的TAskTacker。

    即,Job拆分為Task,分別在框架提供的容器中執行,插件只需要實現Job和Task兩部分邏輯。

    物理執行有三種運行模式:

    總體來說,當JobContainer和TaskGroupContainer運行在同一個進程內的時候就是單機模式,在不同進程執行就是分布式模式。

類型

數據源

Reader(讀)

Writer(寫)

文檔

RDBMS 關係型資料庫

MySQL

讀 、寫


Oracle    

    √    

    √    

讀 、寫


SQLServer

讀 、寫


PostgreSQL

讀 、寫


DRDS

讀 、寫


通用RDBMS(支持所有關係型資料庫)

讀 、寫

阿里雲數倉數據存儲

ODPS

讀 、寫


ADS



OSS

讀 、寫


OCS

讀 、寫

NoSQL數據存儲

OTS

讀 、寫


Hbase0.94

讀 、寫


Hbase1.1

讀 、寫


Phoenix4.x

讀 、寫


Phoenix5.x

讀 、寫


MongoDB

讀 、寫


Hive

讀 、寫

無結構化數據存儲

TxtFile

讀 、寫


FTP

讀 、寫


HDFS

讀 、寫


Elasticsearch


時間序列資料庫

OpenTSDB



TSDB


插件開發指南:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

使用:

環境要求:

(1)、下載DataX源碼: git clone git@github.com:alibaba/DataX.git
(2)、通過maven打包: mvn -U clean package assembly:assembly -Dmaven.test.skip=true
 結構如下:bin conf job lib log log_perf plugin script tmp

注意連接資料庫的jar(可替換成自己對應的資料庫版本)

可視化界面:

(1)、下載datax-web源碼git clone https://github.com/WeiYe-Jing/datax-web.git
(2)、創建資料庫執行bin/db下面的datax_web.sql文件
(3)、修改配置1.修改datax_admin下resources/application.yml文件2.修改datax_executor下resources/application.yml文件

啟動項目:

1.本地idea開發環境

admin啟動成功後日誌會輸出三個地址,兩個接口文檔地址,一個前端頁面地址

啟動成功後打開頁面(默認管理員用戶名:admin 密碼:123456)

http://localhost:8080/index.html#/dashboard

2.配置執行

參數配置說明:

同步規則Json

{  "job": {    "content": [      {        "reader": {          "name": "mysqlreader",          "parameter": {            "column": [//需要同步的欄位              "district_code",              "district_name"            ],            "connection": [//源資料庫連接              {                "jdbcUrl": [                  "jdbc:mysql://localhost:3306/cci?characterEncoding=utf8&allowPublicKeyRetrieval=true&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC"                ],                "table": [//源表                  "std_com_code_district"                ]              }            ],            "password": "123456",            "username": "root",            "where": ""          }        },        "writer": {          "name": "mysqlwriter",          "parameter": {            "column": [//目標欄位              "district_code",              "district_name"            ],            "connection": [//目標資料庫連接              {                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/cci_canal?useUnicode=true&characterEncoding=utf8&useSSL=false",                "table": [                  "syn_table_test"                ]              }            ],            "password": "123456",            "preSql": [],            "session": [],            "username": "root",            "writeMode": "insert" //寫入模式          }        }      }    ],    "setting": {      "speed": {        "channel": "1"      }    }  }}

效果:

每天定時同步數據

源表

目標表:

總結:

    Datax適合離線全量同步,對於增量同步也有偽方案:

    MysqlReader使用JDBC SELECT語句完成數據抽取工作,因此可以使用SELECT…WHERE…進行增量數據抽取,方式有多種:

   對於業務上無欄位區分新增、修改數據情況,MysqlReader也無法進行增量數據同步,只能同步全量數據。

  以上是datax數據同步,以後數據同步就不需要自己寫定時任務了,簡單的配一配,複雜的在這基礎上再開發。完美!

2.增量實時同步canal

開源的組件,已經實現模擬成一個mysql的slave,拉取binlog的服務:

阿里巴巴開源的canal

美團開源的puma

linkedin開源的databus  

增量同步服務要考慮的問題:

如何解決重複插入

如何解決唯一索引衝突

對於DDL語句如何處理

如何解決數據迴環問題

canal/otter基於資料庫的日誌解析,獲取增量變更進行同步,可以實現增量訂閱&消費的業務。canal的最新版本已經實現了GTID,數據重複插入,數據迴環。可以進行DDL語句過濾。

應用場景:

資料庫鏡像

資料庫實時備份

多級索引 (賣家和買家各自分庫索引)

search build

業務cache刷新

價格變化等重要業務消息

原理:

canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議

mysql master收到dump請求,開始推送binary log給slave(也就是canal)

canal解析binary log對象(原始為byte流)

Canal內部組件解析

Canal節點,可以有多個instances,每個instance在運行時為一個單獨的Spring Context,對象實例為「CanalInstanceWithSpring」。每個instances有一個單獨的線程處理整個數據流過程。instance內部有EventParser、EventSink、EventStore、metaManager主要四個組件構成,當然還有其他的守護組件比如monitor、HA心跳檢測、ZK事件監聽等。對象實例初始化和依賴關係,可以參見「default-instance.xml」,其配置模式為普通的Spring。(源碼參見:SpringCanalInstanceGenerator)

   canal的ha分為兩部分,canal server和canal client分別有對應的ha實現canal server:  為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.

      canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序。整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定)

Parser主要用於解析指定"資料庫"的binlog,內部基於JAVA實現的「binlog dump」、「show master status」等。Parser會與ZK交互,並獲取當前instance所有消費者的cursor,並獲其最小值,作為此instance解析binlog的起始position。目前的實現,一個instance同時只能有一個consumer處於active消費狀態,ClientId為定值「1001」,「cursor」中包含consumer消費binlog的position,數字類型。有次可見,Canal instance本身並沒有保存binlog的position,Parser中繼操作是根據consumer的消費cursor位置來決定;對於信息缺失時,比如Canal集群初次online,且在「default-instance.xml」中也沒有指定「masterPositiion」信息(每個instance.properties是可以指定起始position的),那麼將根據「show master status」指令獲取當前binlog的最後位置。(源碼:MysqlEventParser.findStartPosition())

Parser每次、批量獲取一定條數的binlog,將binlog數據封裝成event,並經由EventSink將消息轉發給EventStore,Sink的作用就是「協調Parser和Store」,確保binglog的解析速率與Store隊列容量相容。

EventStore,用於暫存「尚未消費」的events的存儲隊列,默認基於內存的阻塞隊列實現。Store中的數據由Sink組件提交入隊,有NettyServer服務的消費者消費確認後出隊,隊列的容量和容量模式由「canal.properties」中的「memory」相關配置決定。當Store中容量溢滿時,將會阻塞Sink操作(間接阻塞Parser),所以消費者的效能會直接影響instance的同步效率。借鑑了Disruptor的RingBuffer的實現思路。metaManager:主要用於保存Parser組件、CanalServer(即本文中提到的NettyServer)、Canal Instances的meta數據,其中Parser組件涉及到的binlog position、CanalServer與消費者交互時ACK的Cursor信息、instance的集群運行時信息等。根據官方解釋,我們在production級別、高可靠業務要求場景下,metaManager建議基於Zookeeper實現。其中有關Position信息由CanalLogPositionManager類負責,其實現類有多個,在Cluster模式下,建議基於FailbackLogPositionManager,其內部有「primary」、「failback」兩級組合,優先基於primary來存取Position,只有當primary異常時會「降級」使用failback;其配置模式,建議與「default-instance.xml」保持一致。


部署使用:

    canal的作用就是類似於前面所述的binlog syncer,拉取解析binlog。otter是canal的客戶端,專門用於進行數據同步,類似於前文所講解的sql writer。

安裝步驟:

canal+kafka+mysql+canal-admin聯合部署

docker run --name localdb_mysql -v D:\docker_data\mysql\my.cnf:/etc/mysql/conf.d -v D:\docker_data\mysql\logs:/logs -v D:\docker_data\mysql\data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 -d -i -p 3306:3306 mysql:5.7.18
mysql資料庫配置(docker安裝可以使用echo寫入)cd /etc/mysql/mysql.conf.decho -e '[mysqld]\npid-file\t= /var/run/mysqld/mysqld.pid\nsocket\t= /var/run/mysqld/mysqld.sock\ndatadir\t= /var/lib/mysql\nsymbolic-links=0\nserver-id = 1\nlog-bin = binlog\nlog-bin-index = binlog.index'>mysqld.cnf#添加這一行就ok log-bin=mysql-bin#選擇row模式binlog-format=ROW#配置mysql replaction需要定義,不能和canal的slaveId重複server_id=1
創建同步帳號CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;
//單機測試暫時不用安裝zk和kafka安裝zookeeperdocker run --privileged=true -d --name zookeeper --publish 2181:2181 -d zookeeper:latest
安裝zookeeper客戶端  =》zkui  或者 prettyZoo
安裝kafkadocker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
安裝kafka管理界面docker run -d --name kafka-manager \--link zookeeper:zookeeper \--link kafka:kafka -p 9001:9000 \--restart=always \--env ZK_HOSTS=zookeeper:2181 \sheepkiller/kafka-manager
啟動canal、canal-admin、canal-adaptergit clone https://github.com/alibaba/canal.git
測試kafka安裝成功://創建topicbin/kafka-topics.sh --create --zookeeper zookeeper :2181 --replication-factor 1 --partitions 1 --topic mykafka//查看topicbin/kafka-topics.sh --list --zookeeper zookeeper :2181//創建生產者bin/kafka-console-producer.sh --broker-list zookeeper :9092 --topic mykafka //創建消費者bin/kafka-console-consumer.sh --zookeeper zookeeper :2181 --topic mykafka --from-beginning

配置:

canal配置:

##canal.destinations= example  ##canal.conf.dir = ../conf  # 是否開啟「instance」配置修改自動掃描和重載  #####canal.auto.scan = true  canal.auto.scan.interval = 5        #canal.instance.global.mode = spring  #canal.instance.global.lazy = false  ###canal.instance.global.spring.xml = classpath:spring/default-instance.xml           ##canal.id = 1  ##此IP主要為canalServer提供TCP服務而使用,將會被註冊到ZK中,Consumer將與此IP建立連接。canal.ip =  #canal.port = 11111  ##canal.zkServers = 10.0.1.21:2818,10.0.1.22,10.0.2.21:2818/canal/g1  # flush data to zk  #####canal.zookeeper.flush.period = 1000  ##canal.file.data.dir = ${canal.conf.dir}  canal.file.flush.period = 1000  ########canal.instance.memory.batch.mode = MEMSIZE  canal.instance.memory.buffer.size = 16384  canal.instance.memory.buffer.memunit = 1024        # 所能支撐的事務的最大長度,超過閾值之後,一個事務的消息將會被拆分,並多次提交到eventStore中,但是將無法保證事務的完整性  canal.instance.transaction.size =  1024  # 當instance.properties配置文件中指定「master」、「standby」時,當canal與「master」聯通性故障時,觸發連接源的切換,  ##canal.instance.fallbackIntervalInSeconds = 60        ##canal.instance.detecting.enable = true  #如果你需要限定某個database的可用性驗證(比如庫鎖),  #最好使用複雜的、有效的SQL,比如:insert into {database}.{tmpTable} ....  canal.instance.detecting.sql = select 1  #canal.instance.detecting.interval.time = 6  ###canal.instance.detecting.retry.threshold = 5  #如果在instance.properties配置了「master」、「standby」,且此參數開啟時,在「探測失敗」後,會選擇備庫進行binlog獲取  #建議關閉  canal.instance.detecting.heartbeatHaEnable = false        # CanalServer、instance有關的TCP網絡配置,建議保持抱人  canal.instance.network.receiveBufferSize = 16384  canal.instance.network.sendBufferSize = 16384  canal.instance.network.soTimeout = 30     # Parser組件,有關binlog解析的過濾  #canal.instance.filter.query.dcl = false  #canal.instance.filter.query.dml = false  #canal.instance.filter.query.ddl = false  canal.instance.filter.table.error = false  canal.instance.filter.rows = false        # binlog格式和「鏡像」格式檢測,建議保持默認  canal.instance.binlog.format = ROW,STATEMENT,MIXED  canal.instance.binlog.image = FULL,MINIMAL,NOBLOB        # ddl是否隔離發送,保持默認  canal.instance.get.ddl.isolation = false


1.instance列表定義 (列出當前server上有多少個instance,每個instance的加載方式)

參數名字

參數說明

默認值

canal.destinations

當前server上部署的instance列表

canal.conf.dir

conf/目錄所在的路徑

../conf

canal.auto.scan

開啟instance自動掃描

如果配置為true,canal.conf.dir目錄下的instance配置變化會自動觸發:

a. instance目錄新增:觸發instance配置載入,lazy為true時則自動啟動

b. instance目錄刪除:卸載對應instance配置,如已啟動則進行關閉

c. instance.properties文件變化:reload instance配置,如已啟動自動進行重啟操作

true

canal.auto.scan.interval

instance自動掃描的間隔時間,單位秒

5

canal.instance.global.mode

全局配置加載方式

spring

canal.instance.global.lazy

全局lazy模式

false

canal.instance.global.manager.address

全局的manager配置方式的連結信息

canal.instance.global.spring.xml

全局的spring配置方式的組件文件

classpath:spring/file-instance.xml 

(spring目錄相對於canal.conf.dir)

canal.instance.example.mode

canal.instance.example.lazy

canal.instance.example.spring.xml

…..

instance級別的配置定義,如有配置,會自動覆蓋全局配置定義模式

命名規則:canal.instance.{name}.xxx

canal.instance.tsdb.spring.xml

v1.0.25版本新增,全局的tsdb配置方式的組件文件

classpath:spring/tsdb/h2-tsdb.xml 

(spring目錄相對於canal.conf.dir)

2.common參數定義,比如可以將instance.properties的公用參數,抽取放置到這裡,這樣每個instance啟動的時候就可以共享.

參數名字

參數說明

默認值

canal.id

每個canal server實例的唯一標識,暫無實際意義

1

canal.ip

canal server綁定的本地IP信息,如果不配置,默認選擇一個本機IP進行啟動服務

canal.port

canal server提供socket服務的埠

11111

canal.zkServers

canal server連結zookeeper集群的連結信息

例子:127.0.0.1:2181,127.0.0.1:2182

canal.zookeeper.flush.period

canal持久化數據到zookeeper上的更新頻率,單位毫秒

1000

canal.file.data.dir

canal持久化數據到file上的目錄

../conf (默認和instance.properties為同一目錄,方便運維和備份)

canal.file.flush.period

canal持久化數據到file上的更新頻率,單位毫秒

1000

canal.instance.memory.batch.mode

canal內存store中數據緩存模式

1. ITEMSIZE : 根據buffer.size進行限制,只限制記錄的數量

2. MEMSIZE : 根據buffer.size * buffer.memunit的大小,限制緩存記錄的大小

MEMSIZE

canal.instance.memory.buffer.size

canal內存store中可緩存buffer記錄數,需要為2的指數

16384

canal.instance.memory.buffer.memunit

內存記錄的單位大小,默認1KB,和buffer.size組合決定最終的內存使用大小

1024

canal.instance.transactionn.size

最大事務完整解析的長度支持

超過該長度後,一個事務可能會被拆分成多次提交到canal store中,無法保證事務的完整可見性

1024

canal.instance.fallbackIntervalInSeconds

canal發生mysql切換時,在新的mysql庫上查找binlog時需要往前查找的時間,單位秒

說明:mysql主備庫可能存在解析延遲或者時鐘不統一,需要回退一段時間,保證數據不丟

60

canal.instance.detecting.enable

是否開啟心跳檢查

false

canal.instance.detecting.sql

心跳檢查sql

insert into retl.xdual values(1,now()) on duplicate key update x=now()

canal.instance.detecting.interval.time

心跳檢查頻率,單位秒

3

canal.instance.detecting.retry.threshold

心跳檢查失敗重試次數

3

canal.instance.detecting.heartbeatHaEnable

心跳檢查失敗後,是否開啟自動mysql自動切換

說明:比如心跳檢查失敗超過閥值後,如果該配置為true,canal就會自動鏈到mysql備庫獲取binlog數據

false

canal.instance.network.receiveBufferSize

網絡連結參數,SocketOptions.SO_RCVBUF

16384

canal.instance.network.sendBufferSize

網絡連結參數,SocketOptions.SO_SNDBUF

16384

canal.instance.network.soTimeout

網絡連結參數,SocketOptions.SO_TIMEOUT

30

canal.instance.filter.query.dcl

是否忽略DCL的query語句,比如grant/create user等

false

canal.instance.filter.query.dml

是否忽略DML的query語句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query記錄)

false

canal.instance.filter.query.ddl

是否忽略DDL的query語句,比如create table/alater table/drop table/rename table/create index/drop index. (目前支持的ddl類型主要為table級別的操作,create databases/trigger/procedure暫時劃分為dcl類型)

false

canal.instance.filter.druid.ddl

v1.0.25版本新增,是否啟用druid的DDL parse的過濾,基於sql的完整parser可以解決之前基於正則匹配補全的問題,默認為true

true

canal.instance.get.ddl.isolation

ddl語句是否隔離發送,開啟隔離可保證每次只返回發送一條ddl數據,不和其他dml語句混合返回.(otter ddl同步使用)

false

conf/example/instance.properties配置文件:

在canal.properties定義了canal.destinations後,需要在canal.conf.dir對應的目錄下建立同名的文件。

如果canal.properties未定義instance列表,但開啟了canal.auto.scan時

1. 發現目錄有新增,啟動新的instance

2. 發現目錄有刪除,關閉老的instance

3. 發現對應目錄的instance.properties有變化,重啟instance

instance.properties參數列表:

####canal.instance.mysql.slaveId = 11110001     # 資料庫相關:master庫  ####canal.instance.master.address = 127.0.0.1:3306  #canal.instance.master.journal.name =  #canal.instance.master.position =  #canal.instance.master.timestamp =     ###canal.instance.standby.address =  #canal.instance.standby.journal.name =  #canal.instance.standby.position =  #canal.instance.standby.timestamp =     # 資料庫連接的用戶名和密碼  # 貌似Consumer與CanalServer建立連接時也用的是此用戶名和密碼  canal.instance.dbUsername = canal  canal.instance.dbPassword = canal  # 默認資料庫  canal.instance.defaultDatabaseName =  canal.instance.connectionCharset = UTF-8     # schema過濾規則,類似於MySQL binlog的filter  # canal將會過濾那些不符合要求的table,這些table的數據將不會被解析和傳送  # filter格式,Consumer端可以指定,只不過是後置的。#canal.instance.filter.regex = .*\\..*  # table black regex  canal.instance.filter.black.regex =


參數名字

參數說明

默認值

canal.instance.mysql.slaveId

mysql集群配置中的serverId概念,需要保證和當前mysql集群中id唯一

1234

canal.instance.master.address

mysql主庫連結地址

127.0.0.1:3306

canal.instance.master.journal.name

mysql主庫連結時起始的binlog文件

canal.instance.master.position

mysql主庫連結時起始的binlog偏移量

canal.instance.master.timestamp

mysql主庫連結時起始的binlog的時間戳

canal.instance.dbUsername

mysql資料庫帳號

canal

canal.instance.dbPassword

mysql資料庫密碼

canal

canal.instance.defaultDatabaseName

mysql連結時默認schema


canal.instance.connectionCharset

mysql 數據解析編碼

UTF-8

canal.instance.filter.regex

mysql 數據解析關注的表,Perl正則表達式.

多個正則之間以逗號(,)分隔,轉義符需要雙斜槓(\\) 

常見例子:

1.  所有表:.*   or  .*\\..*

2.  canal schema下所有表:canal\\..*

3.  canal下的以canal打頭的表:canal\\.canal.*

4.  canal schema下的一張表:canal.test1

5.  多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)

注意:此過濾條件只針對row模式的數據有效(ps. mixed/statement因為不解析sql,所以無法準確提取tableName進行過濾)

.*\\..*

canal.instance.tsdb.enable

v1.0.25版本新增,是否開啟table meta的時間序列版本記錄功能

true

canal.instance.tsdb.dir

v1.0.25版本新增,table meta的時間序列版本的本地存儲路徑,默認為instance目錄

{canal.instance.destination:}

canal.instance.tsdb.url

v1.0.25版本新增,table meta的時間序列版本存儲的資料庫連結串,比如例子為本地嵌入式資料庫

jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;

canal.instance.tsdb.dbUsername

v1.0.25版本新增,table meta的時間序列版本存儲的資料庫連結帳號

canal

canal.instance.tsdb.dbUsername

v1.0.25版本新增,table meta的時間序列版本存儲的資料庫連結密碼

canal

說明:

mysql連結時的起始位置

canalinstance.master.journal.name + canal.instance.master.position : 精確指定一個binlog位點,進行啟動

canal.instance.master.timestamp : 指定一個時間戳,canal會自動遍歷mysql binlog,找到對應時間戳的binlog位點後,進行啟動

不指定任何信息:默認從當前資料庫的位點,進行啟動。(show master status)



測試簡單配置:

#position info,需要改成自己的資料庫信息canal.instance.master.address = 127.0.0.1:3306#username/password,需要改成自己的資料庫信息canal.instance.dbUsername = canalcanal.instance.dbPassword = canal

適配器配置:

dataSourceKey: defaultDSdestination: examplegroupId: g1outerAdapterKey: mysconcurrent: truedbMapping:  #源資料庫  database: source  #源表  table: source_test  #目標表  targetTable: target_test  targetPk:    #源表和目標表的主鍵對應關係    id: id#  mapAll: true  targetColumns:    name: name

目標:

對源表進行增刪改操作同步到目標表

啟動canal

啟動rdb適配器=》同步mysql

canal-admin進行配置和實例管理

對源表插入數據

適配器日誌:

2021-02-23 13:24:53.987 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":3,"name":"user3"},"database":"source","destination":"example","old":null,"table":"source_test","type":"INSERT"}

目標表:

項目中訂閱:

  初步看,如果要實現異地雙活,實時同步這樣的需求,這兩款組件貌似都沒滿足要求,但也不能完全無視其其他用途,比如canal配置zk和使用kafka,可以實現一對多訂閱,實時同步mysql數據,刷redis緩存,elasticsearch搜索庫等常用組件,效果上還是挺實用的。

    下一步整理下源碼上寫的有哪些可取的亮點特色。

相關焦點

  • 數據同步中間件 Porter 3.0 發布,新增單機模式
    數據同步中間件 Porter 3.0發布。
  • 說說面試官:說說對Redux中間件的理解?常用的中間件有哪些?實現原理?
    一、是什麼中間件(Middleware)在計算機中,是介於應用系統和系統軟體之間的一類軟體,它使用系統軟體所提供的基礎服務(功能),銜接網絡上應用系統的各個部分或不同的應用,能夠達到資源共享、功能共享的目的在這篇文章中,了解到了Redux整個工作流程,當action發出之後,reducer
  • 中創中間件InforSuite SIBV8.2.1發版
    中創中間件InforSuite SIB V8.2.1於2011 年5月9 日完成確認測試並於日前正式發版。  與上一版本相比,InforSuite SIB V8.2.1在在數據集成領域新增增量抽取模式下DBSource集群構件和視圖算子構件兩個功能。
  • 會員實踐:中國中間件的開拓者東方通入會,共促數據互聯互通
    可以說,中國成為全球第一數據大國指日可待,成為數據強國還需加強數據互聯互通,深化數據應用。對此,貴陽大數據交易所作為全球乃至全國第一家大數據交易所,在2018數博會上適時出臺「數+12」戰略,穩步擴大數據交易朋友圈,共同促進數據資源跨地域、跨行業有序流通,推動數據資源化、資產化、資本化,服務「數字中國」、21世紀數字絲綢之路等的建設。
  • 3分鐘短文|Laravel 中間件傳遞數據到控制器
    一般我們也是這麼做的,但是你想過沒有,中間件如何傳遞數據到下遊? 本文就來說一說。學習時間比如有一個需求,根據用戶身份,判斷其是否可以訪問某個頁面。先註冊一個路由地址,在 route.php 文件內添加如下參數:Route::get('pages/{id}', ['as' => 'pages', 'middleware' => 'pageUser', 'uses' => 'PagesController@view' ]);接著是實現 PageUserMiddleware.php 中間件邏輯
  • 看完這篇,還怕面試官問消息中間件麼?
    說到消息中間件,工作中經常會用到MQ消息中間件,常見的消息中間件有Apache的ActiveMQ以及RabbitMQ。不管是ActiveMQ還是RabbitMQ都是基於JMS規範的消息中間件,它們都是消息服務的「提供者」。那麼什麼是 JMS?
  • 克麗發現:中間件企業智能中樞對話寫實
    何力健:其實我對中間件的了解不是很多,但我覺得中間件是必須的,因為在操作平臺和應用上,一定有一個集成的層面。如果是對單一應用系統來說,中間件可能意義不大。但是整個信息系統的開發和應用面對的是集成,在我們的應用系統中,不是單一廠商開發的,也不會用單一的平臺做企業的IT設計,所以一定會涉及到數據傳遞等方面的應用。
  • 【金三銀四】Java中間件面試題(2021最新版)
    而消息中間件在其中起到了一個中間橋梁的重要作用。因此,面試中也經常會被問到消息中間件相關的問題。從其使用到其原理設計,都會是面試官感興趣的一個點。Zookeeper1. ZooKeeper 是什麼?2. ZooKeeper 提供了什麼?
  • 計算機行業專題:如何撬動中間件市場國際領先者的奶酪
    計世資訊數據顯示,2018年僅IBM、Oracle兩家市場份額就高達50%,佔據中間件市場前兩位。並且,從技術服務到市場執行,IBM、Oracle均屬於行業領導者。二、國內中間件需要面對的挑戰和產業機會(一)挑戰一:IBM、Oracle 具備成體系的產品整合能力強國內研發中間件並不晚。
  • 國內中間件廠商強勢崛起 寶蘭德拓寬技術護城河
    中間件企業眾多,為什麼中國規模最大的移動通信運營商選擇了寶蘭德呢?公司方面對證券時報記者介紹說,主要來自於公司在技術、產品線、服務和管理四個方面所積聚起來的相對優勢。而在技術基礎方面,寶蘭德在中間件領域和智能運維領域擁有18項核心技術,同時還有26項技術儲備,技術覆蓋雲計算、大數據、人工智慧、分布式資料庫等技術前沿;具備優質的客戶基礎,最終客戶為電信、金融、政府行業的主要大型企業或組織。
  • 第三屆信也科技技術沙龍:揭秘消息中間件的原理與實踐
    本次沙龍的主題為《消息中間件核心原理揭秘與最佳實踐》,主辦方信也科技邀請了來自攜程旅遊、中通科技、信也科技等多家知名網際網路公司的資深專家,共聚一堂為現場觀眾傳道、授業、解惑,給大家帶來了一場精彩的消息中間件分享大會。信也科技布道師、基礎組件架構研究員赫傑輝主持了本次活動。
  • 基於ActiveMQ的消息中間件系統 OneMM邏輯與物理架構設計詳解
    隨著企業信息化建設的不斷深入,多種業務應用相互關聯,容易造成底層數據分散,應用系統間的耦合度高。針對該問題應從整體上調整目前系統架構,面向不同業務應用提供統一的數據訪問服務,使用消息中間件對不同系統間的交互進行解耦,消息中間件技術有兩個核心功能:異步和解耦。這兩個核心功能整體上提高了應用系統的工作效率,增強了系統的可用性、穩定性和可擴展性,提升了用戶體驗。
  • Node.js + Express中間件詳解
    使用中間件 Express是一種路由和中間件Web框架,它具有自己的最小功能:Express應用程式本質上是一系列中間件函數調用。中間件功能可以執行以下任務: 如果當前的中間件函數沒有結束請求 - 響應周期,則必須調用next()以將控制傳遞給下一個中間件函數。否則,請求將被掛起。Express應用程式可以使用以下類型的中間件: 您可以使用可選的裝載路徑加載應用程式級和路由器級中間件。
  • 前端如何正確使用中間件?
    阿里妹導讀:中間件可以算是一種前端中常用的」設計模式「了,有的時候甚至可以說,整個應用的架構都是使用中間件為基礎搭建的。那麼中間件有哪些利弊?什麼才是中間件正確的使用姿勢?本文將分享作者在實際使用中的一些想法,歡迎同學們共同討論。文末福利:下載《大促背後的前端核心業務實踐》電子書。
  • 這位「華人AI前10大牛科學家」,如何用「AI中間件」跨越人工智慧和...
    企業運營數據中包含大量的歧義和模糊性,如何處理它們和從中獲得價值,以及如何讓AI了解企業的具體業務流程,是件困難的事情。張本宇決定出來創業,做一家針對B端企業客戶的公司,將前沿的AI技術,引入國內企業的業務中。「深度學習技術發展到一定階段之後,是可以幫助AI工程師進入更多的行業,進行跨行業服務和研發的。」2015年5月,雲腦科技在矽谷成立。
  • 阿里云云原生中間件 2021 年春季校招實習啟動啦~
    我們是「阿里云云原生中間件」團隊,目前正式開始進行 2021 年春季校招實習的意向溝通,有任何問題歡迎加微信諮詢 【xiayimiaoshenghua】, 加微信備註下 「姓名+學校+期望實習城市」。團隊介紹阿里云云原生中間件團隊負責分布式軟體基礎設施,為阿里雲上萬家企業提供如微服務引擎、服務網格、消息服務等分布式基礎服務,加速企業上雲的進程和創新速度。
  • 面向個人手機用戶的IMS中間件技術,電信運營商該如何應對?
    不管基於當下的4G網絡還是未來的5G,在當前移動網際網路業務需求日益增大的背景下,電信運營商如何更好地發揮IMS網絡的能力、提升智能通道的價值來為大眾提供更好的通信服務,這是當前業界的重點研究課題之一。本文中提出一種基於IMS中間件的技術為個人移動用戶提供多樣化的網際網路業務,同時為電信運營商帶來增值收入。
  • 美國艾邁系統發布RFID中間件Trackway3.0版本
    美國艾邁系統(亞洲)有限公司近日發布了該公司產品RFID中間件系統trackway 3.0版本。該版本的中間件能夠通過快速的開發和部署完成對市場上現有的任何自動識別設備(包括RFID和條碼)的無縫連接。
  • Redux/react-redux/redux中間件設計實現剖析
    容易誤操作」比如說,有人一個不小心把store賦值了{},清空了store,或者誤修改了其他組件的數據,那顯然不太安全,出錯了也很難排查,因此我們需要「有條件地」操作store,防止使用者直接修改store的數據。
  • CRIWARE官方音頻中間件ADX2 CACSA認證培訓
    本次CRIWARE官方認證培訓是CRIWARE中國首次開展的線下認證培訓,主要針對ADX2的音頻中間件的培訓