如何基于Canal 和 Kafka,实现 MySQL 的 Binlog 近实时同步
轉載自?如何基于Canal 和 Kafka,實現 MySQL 的 Binlog 近實時同步
近段時間,業務系統架構基本完備,數據層面的建設比較薄弱,因為筆者目前工作重心在于搭建一個小型的數據平臺。優先級比較高的一個任務就是需要近實時同步業務系統的數據(包括保存、更新或者軟刪除)到一個另一個數據源,持久化之前需要清洗數據并且構建一個相對合理的便于后續業務數據統計、標簽系統構建等擴展功能的數據模型。基于當前團隊的資源和能力,優先調研了Alibaba開源中間件Canal的使用。
這篇文章簡單介紹一下如何快速地搭建一套Canal相關的組件。
關于Canal
下面的簡介和下一節的原理均來自于Canal項目的README:
Canal[k?'n?l],譯意為水道/管道/溝渠,主要用途是基于MySQL數據庫增量日志解析,提供增量數據訂閱和消費。
?
早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基于業務trigger獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。
?
基于日志增量訂閱和消費的業務包括:
?
-
數據庫鏡像
-
數據庫實時備份
-
索引構建和實時維護(拆分異構索引、倒排索引等)
-
業務Cache刷新
-
帶業務邏輯的增量數據處理
Canal 的工作原理
MySQL主備復制原理:
-
MySQL的Master實例將數據變更寫入二進制日志(binary log,其中記錄叫做二進制日志事件binary log events,可以通過show binlog events進行查看)
-
MySQL的Slave實例將master的binary log events拷貝到它的中繼日志(relay log)
-
MySQL的Slave實例重放relay log中的事件,將數據變更反映它到自身的數據
?
Canal的工作原理如下:
?
-
Canal模擬MySQL Slave的交互協議,偽裝自己為MySQL Slave,向MySQL Master發送dump協議
-
MySQL Master收到dump請求,開始推送binary log給Slave(即Canal)
-
Canal解析binary log對象(原始為byte流),并且可以通過連接器發送到對應的消息隊列等中間件中
關于Canal的版本和部件
截止筆者開始編寫本文的時候(2020-03-05),Canal的最新發布版本是v1.1.5-alpha-1(2019-10-09發布的),最新的正式版是v1.1.4(2019-09-02發布的)。
?
其中,v1.1.4主要添加了鑒權、監控的功能,并且做了一些列的性能優化,此版本集成的連接器是Tcp、Kafka和RockerMQ。而v1.1.5-alpha-1版本已經新增了RabbitMQ連接器,但是此版本的RabbitMQ連接器暫時不能定義連接RabbitMQ的端口號,不過此問題已經在master分支中修復(具體可以參看源碼中的CanalRabbitMQProducer類的提交記錄)。
?
換言之,v1.1.4版本中目前能使用的內置連接器只有Tcp、Kafka和RockerMQ三種,如果想嘗鮮使用RabbitMQ連接器,可以選用下面的兩種方式之一:
?
-
選用v1.1.5-alpha-1版本,但是無法修改RabbitMQ的port屬性,默認為5672。
-
基于master分支自行構建Canal。
?
目前,Canal項目的活躍度比較高,但是考慮到功能的穩定性問題,筆者建議選用穩定版本在生產環境中實施,當前可以選用v1.1.4版本,本文的例子用選用的就是v1.1.4版本,配合Kafka連接器使用。Canal主要包括三個核心部件:
?
-
canal-admin:后臺管理模塊,提供面向WebUI的Canal管理能力。
-
canal-adapter:適配器,增加客戶端數據落地的適配及啟動功能,包括REST、日志適配器、關系型數據庫的數據同步(表對表同步)、HBase數據同步、ES數據同步等等。
-
canal-deployer:發布器,核心功能所在,包括binlog解析、轉換和發送報文到連接器中等等功能都由此模塊提供。
一般情況下,canal-deployer部件是必須的,其他兩個部件按需選用即可。
部署所需的中間件
搭建一套可以用的組件需要部署MySQL、Zookeeper、Kafka和Canal四個中間件的實例,下面簡單分析一下部署過程。選用的虛擬機系統是CentOS7。
安裝MySQL
為了簡單起見,選用yum源安裝(官方鏈接是https://dev.mysql.com/downloads/repo/yum):
mysql80-community-release-el7-3雖然包名帶了mysql80關鍵字,其實已經集成了MySQL主流版本5.6、5.7和8.x等等的最新安裝包倉庫
選用的是最新版的MySQL8.x社區版,下載CentOS7適用的rpm包:
cd?/data/mysql wget?https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm //?下載完畢之后 sudo?rpm?-Uvh?mysql80-community-release-el7-3.noarch.rpm此時列舉一下yum倉庫里面的MySQL相關的包:
[root@localhost?mysql]#?yum?repolist?all?|?grep?mysql mysql-cluster-7.5-community/x86_64?MySQL?Cluster?7.5?Community???disabled mysql-cluster-7.5-community-source?MySQL?Cluster?7.5?Community?-?disabled mysql-cluster-7.6-community/x86_64?MySQL?Cluster?7.6?Community???disabled mysql-cluster-7.6-community-source?MySQL?Cluster?7.6?Community?-?disabled mysql-cluster-8.0-community/x86_64?MySQL?Cluster?8.0?Community???disabled mysql-cluster-8.0-community-source?MySQL?Cluster?8.0?Community?-?disabled mysql-connectors-community/x86_64??MySQL?Connectors?Community????enabled:????141 mysql-connectors-community-source??MySQL?Connectors?Community?-??disabled mysql-tools-community/x86_64???????MySQL?Tools?Community?????????enabled:????105 mysql-tools-community-source???????MySQL?Tools?Community?-?Sourc?disabled mysql-tools-preview/x86_64?????????MySQL?Tools?Preview???????????disabled mysql-tools-preview-source?????????MySQL?Tools?Preview?-?Source??disabled mysql55-community/x86_64???????????MySQL?5.5?Community?Server????disabled mysql55-community-source???????????MySQL?5.5?Community?Server?-??disabled mysql56-community/x86_64???????????MySQL?5.6?Community?Server????disabled mysql56-community-source???????????MySQL?5.6?Community?Server?-??disabled mysql57-community/x86_64???????????MySQL?5.7?Community?Server????disabled mysql57-community-source???????????MySQL?5.7?Community?Server?-??disabled mysql80-community/x86_64???????????MySQL?8.0?Community?Server????enabled:????161 mysql80-community-source???????????MySQL?8.0?Community?Server?-??disabled編輯/etc/yum.repos.d/mysql-community.repo文件([mysql80-community]塊中enabled設置為1,其實默認就是這樣子,不用改,如果要選用5.x版本則需要修改對應的塊):
[mysql80-community] name=MySQL?8.0?Community?Server baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/ enabled=1 gpgcheck=1 gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql然后安裝MySQL服務:
sudo?yum?install?mysql-community-server這個過程比較漫長,因為需要下載和安裝5個rpm安裝包(或者是所有安裝包組合的壓縮包mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar)。如果網絡比較差,也可以直接從官網手動下載后安裝:
//?下載下面5個rpm包?common?-->?libs?-->?libs-compat?-->?client?-->?server mysql-community-common mysql-community-libs mysql-community-libs-compat mysql-community-client mysql-community-server//?強制安裝 rpm?-ivh?mysql-community-common-8.0.18-1.el7.x86_64.rpm?--force?--nodeps rpm?-ivh?mysql-community-libs-8.0.18-1.el7.x86_64.rpm?--force?--nodeps rpm?-ivh?mysql-community-libs-compat-8.0.18-1.el7.x86_64.rpm?--force?--nodeps rpm?-ivh?mysql-community-client-8.0.18-1.el7.x86_64.rpm?--force?--nodeps rpm?-ivh?mysql-community-server-8.0.18-1.el7.x86_64.rpm?--force?--nodeps安裝完畢之后,啟動MySQL服務,然后搜索MySQL服務的root賬號的臨時密碼用于首次登陸(mysql -u root -p):
//?啟動服務,關閉服務就是service?mysqld?stop service?mysqld?start //?查看臨時密碼?cat?/var/log/mysqld.log [root@localhost?log]#?cat?/var/log/mysqld.log? 2020-03-02T06:03:53.996423Z?0?[System]?[MY-013169]?[Server]?/usr/sbin/mysqld?(mysqld?8.0.18)?initializing?of?server?in?progress?as?process?22780 2020-03-02T06:03:57.321447Z?5?[Note]?[MY-010454]?[Server]?A?temporary?password?is?generated?for?root@localhost:?>kjYaXENK6li 2020-03-02T06:04:00.123845Z?0?[System]?[MY-010116]?[Server]?/usr/sbin/mysqld?(mysqld?8.0.18)?starting?as?process?22834 //?登錄臨時root用戶,使用臨時密碼 [root@localhost?log]#?mysql?-u?root?-p接下來做下面的操作:
-
修改root用戶的密碼:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12!@';(注意密碼規則必須包含大小寫字母、數字和特殊字符)
-
更新root的host,切換數據庫use mysql;,指定host為%以便可以讓其他服務器遠程訪問UPDATE USER SET HOST = '%' WHERE USER = 'root';
-
賦予'root'@'%'用戶,所有權限,執行GRANT ALL PRIVILEGES ON?.?TO 'root'@'%';
-
改變root'@'%用戶的密碼校驗規則以便可以使用Navicat等工具訪問:ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';
操作完成之后,就可以使用root用戶遠程訪問此虛擬機上的MySQL服務。最后確認是否開啟了binlog(注意一點是MySQL8.x默認開啟binlog)SHOW VARIABLES LIKE '%bin%';:
最后在MySQL的Shell執行下面的命令,新建一個用戶名canal密碼為QWqw12!@的新用戶,賦予REPLICATION SLAVE和 REPLICATION CLIENT權限:
CREATE?USER?canal?IDENTIFIED?BY?'QWqw12!@'; GRANT?SELECT,?REPLICATION?SLAVE,?REPLICATION?CLIENT?ON?*.*?TO?'canal'@'%'; FLUSH?PRIVILEGES; ALTER?USER?'canal'@'%'?IDENTIFIED?WITH?mysql_native_password?BY?'QWqw12!@';切換回去root用戶,創建一個數據庫test:
CREATE?DATABASE?`test`?CHARSET?`utf8mb4`?COLLATE?`utf8mb4_unicode_ci`;安裝Zookeeper
Canal和Kafka集群都依賴于Zookeeper做服務協調,為了方便管理,一般會獨立部署Zookeeper服務或者Zookeeper集群。筆者這里選用2020-03-04發布的3.6.0版本:
?midkr?/data/zk
#?創建數據目錄
midkr?/data/zk/data
cd?/data/zk
wget?http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar?-zxvf?apache-zookeeper-3.6.0-bin.tar.gz
cd?apache-zookeeper-3.6.0-bin/conf
cp?zoo_sample.cfg?zoo.cfg?&&?vim?zoo.cfg
把zoo.cfg文件中的dataDir設置為/data/zk/data,然后啟動Zookeeper:
?[root@localhost?conf]#?sh?/data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh?start
/usr/bin/java
ZooKeeper?JMX?enabled?by?default
Using?config:?/data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting?zookeeper?...?STARTED
這里注意一點,要啟動此版本的Zookeeper服務必須本地安裝好JDK8+,這一點需要自行處理。啟動的默認端口是2181,啟動成功后的日志如下:
安裝Kafka
Kafka是一個高性能分布式消息隊列中間件,它的部署依賴于Zookeeper。筆者在此選用2.4.0并且Scala版本為2.13的安裝包:
?mkdir?/data/kafka
mkdir?/data/kafka/data
wget?http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar?-zxvf?kafka_2.13-2.4.0.tgz
由于解壓后/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中對應的zookeeper.connect=localhost:2181已經符合需要,不必修改,需要修改日志文件的目錄log.dirs為/data/kafka/data。然后啟動Kafka服務:
?sh?/data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh?/data/kafka/kafka_2.13-2.4.0/config/server.properties
這樣啟動一旦退出控制臺就會結束Kafka進程,可以添加-daemon參數用于控制Kafka進程后臺不掛斷運行。
?sh?/data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh?-daemon?/data/kafka/kafka_2.13-2.4.0/config/server.properties
安裝和使用Canal
終于到了主角登場,這里選用Canal的v1.1.4穩定發布版,只需要下載deployer模塊:
?mkdir?/data/canal
cd?/data/canal
#?這里注意一點,Github在國內被墻,下載速度極慢,可以先用其他下載工具下載完再上傳到服務器中
wget?https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar?-zxvf?canal.deployer-1.1.4.tar.gz
解壓后的目錄如下:
?-?bin???#?運維腳本
-?conf??#?配置文件
??canal_local.properties??#?canal本地配置,一般不需要動
??canal.properties????????#?canal服務配置
??logback.xml?????????????#?logback日志配置
??metrics?????????????????#?度量統計配置
??spring??????????????????#?spring-實例配置,主要和binlog位置計算、一些策略配置相關,可以在canal.properties選用其中的任意一個配置文件
??example?????????????????#?實例配置文件夾,一般認為單個數據庫對應一個獨立的實例配置文件夾
????instance.properties???#?實例配置,一般指單個數據庫的配置
-?lib???#?服務依賴包
-?logs??#?日志文件輸出目錄
在開發和測試環境建議把logback.xml的日志級別修改為DEBUG方便定位問題。這里需要關注canal.properties和instance.properties兩個配置文件。canal.properties文件中,需要修改:
-
去掉canal.instance.parser.parallelThreadSize = 16這個配置項的注釋,也就是啟用此配置項,和實例解析器的線程數相關,不配置會表現為阻塞或者不進行解析。
-
canal.serverMode配置項指定為kafka,可選值有tcp、kafka和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以選用rabbitmq),默認是kafka。
-
canal.mq.servers配置需要指定為Kafka服務或者集群Broker的地址,這里配置為127.0.0.1:9092。
canal.mq.servers在不同的canal.serverMode有不同的意義。
kafka模式下,指Kafka服務或者集群Broker的地址,也就是bootstrap.servers
rocketmq模式下,指NameServer列表
rabbitmq模式下,指RabbitMQ服務的Host和Port
其他配置項可以參考下面兩個官方Wiki的鏈接:
-
Canal-Kafka-RocketMQ-QuickStart
-
AdminGuide
instance.properties一般指一個數據庫實例的配置,Canal架構支持一個Canal服務實例,處理多個數據庫實例的binlog異步解析。instance.properties需要修改的配置項主要包括:
-
canal.instance.mysql.slaveId需要配置一個和Master節點的服務ID完全不同的值,這里筆者配置為654321。
-
配置數據源實例,包括地址、用戶、密碼和目標數據庫:
-
canal.instance.master.address,這里指定為127.0.0.1:3306。
-
canal.instance.dbUsername,這里指定為canal。
-
canal.instance.dbPassword,這里指定為QWqw12!@。
-
新增canal.instance.defaultDatabaseName,這里指定為test(需要在MySQL中建立一個test數據庫,見前面的流程)。
-
-
Kafka相關配置,這里暫時使用靜態topic和單個partition:
-
canal.mq.topic,這里指定為test,也就是解析完的binlog結構化數據會發送到Kafka的命名為test的topic中。
-
canal.mq.partition,這里指定為0。
-
配置工作做好之后,可以啟動Canal服務:
?sh?/data/canal/bin/startup.sh?
#?查看服務日志
tail?-100f?/data/canal/logs/canal/canal
#?查看實例日志??--?一般情況下,關注實例日志即可
tail?-100f?/data/canal/logs/example/example.log
啟動正常后,見實例日志如下:
在test數據庫創建一個訂單表,并且執行幾個簡單的DML:
?use?`test`;
CREATE?TABLE?`order`
(
????id??????????BIGINT?UNIQUE?PRIMARY?KEY?AUTO_INCREMENT?COMMENT?'主鍵',
????order_id????VARCHAR(64)????NOT?NULL?COMMENT?'訂單ID',
????amount??????DECIMAL(10,?2)?NOT?NULL?DEFAULT?0?COMMENT?'訂單金額',
????create_time?DATETIME???????NOT?NULL?DEFAULT?CURRENT_TIMESTAMP?COMMENT?'創建時間',
????UNIQUE?uniq_order_id?(`order_id`)
)?COMMENT?'訂單表';
INSERT?INTO?`order`(order_id,?amount)?VALUES?('10086',?999);
UPDATE?`order`?SET?amount?=?10087?WHERE?order_id?=?'10086';
DELETE??FROM?`order`?WHERE?order_id?=?'10086';
這個時候,可以利用Kafka的kafka-console-consumer或者Kafka Tools查看test這個topic的數據:
?sh?/data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh?--bootstrap-server?127.0.0.1:9092?--from-beginning?--topic?test
具體的數據如下:
?//?test數據庫建庫腳本
{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE?DATABASE?`test`?CHARSET?`utf8mb4`?COLLATE?`utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}
//?order表建表DDL
{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE?TABLE?`order`\n(\n????id??????????BIGINT?UNIQUE?PRIMARY?KEY?AUTO_INCREMENT?COMMENT?'主鍵',\n????order_id????VARCHAR(64)????NOT?NULL?COMMENT?'訂單ID',\n????amount??????DECIMAL(10,?2)?NOT?NULL?DEFAULT?0?COMMENT?'訂單金額',\n????create_time?DATETIME???????NOT?NULL?DEFAULT?CURRENT_TIMESTAMP?COMMENT?'創建時間',\n????UNIQUE?uniq_order_id?(`order_id`)\n)?COMMENT?'訂單表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"}
//?INSERT
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02?05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}
//?UPDATE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02?05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"}
//?DELETE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02?05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}
可見Kafka的名為test的topic已經寫入了對應的結構化binlog事件數據,可以編寫消費者監聽Kafka對應的topic然后對獲取到的數據進行后續處理。
小結
這篇文章大部分篇幅用于介紹其他中間件是怎么部署的,這個問題側面說明了Canal本身部署并不復雜,它的配置文件屬性項比較多,但是實際上需要自定義和改動的配置項是比較少的,也就是說明了它的運維成本和學習成本并不高。后面會分析基于結構化binlog事件做ELT和持久化相關工作以及Canal的生產環境可用級別HA集群的搭建。
?
總結
以上是生活随笔為你收集整理的如何基于Canal 和 Kafka,实现 MySQL 的 Binlog 近实时同步的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (linux root设置)
- 下一篇: 面试官问我:Redis 内存满了怎么办