日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

mysql数据实时同步:Canal安装部署、kafka安装、zk安装、mysql安装、Canal Server+Canal Client HA,Canal+mysql+Kafka,相关验证(学习笔记)

發布時間:2024/9/27 数据库 24 豆豆

目錄

  • Canal安裝部署
    1.1. 服務器準備
    1.2. 設置主機名并配置hosts
    1.3. 免密設置
    1.4. 設置ntp時間
    1.5. 關閉防火墻
    1.6. 關閉selinux
    1.7. 安裝JDK
    1.8. 安裝zookeeper
    1.9. 安裝scala
  • 安裝Kafka
    2.1. 解壓
    2.2. 配置環境變量
    2.3. 修改配置文件
    2.4. 再次修改server.properties
    2.5. 創建日志目錄
    2.6. Kafka集群啟動與測試
    2.7. topic數據發送與消費
    2.8. Kafka集群監控–KafkaOffsetMonitor(老的方式)
    2.9. Kafka集群監控–KafkaCenter
    2.9.1. 下載
    2.9.2. 初始化
    2.9.3. 編輯 application.properties屬性文件
    2.9.4. 編譯和運行
  • 安裝mysql
    3.1. 卸載原來的mysql
    3.2. 創建canal賬號
    3.3. 開啟Binlog寫入功能
  • Canal快速安裝部署
    4.1. 機器準備
    4.2. 下載canal
    4.3. 解壓縮
    4.4. 修改配置文件
    4.5. 創建example的topic
    4.6. 啟動canal服務
    4.7. 驗證功能
    4.8. 準備數據庫測試數據
    4.9. ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x106d73f2, /192.168.106.1:1312 :> /192.168.106.103:11111], exception=java.nio.channels.ClosedChannelException
    4.10. 數據監控微服務
  • Canal Server+Canal Client HA
    5.1. 機器準備
    5.2. 下載canal
    5.3. 解壓縮
    5.4. 修改配置文件
    5.4.1. 修改 canal.properties
    5.4.2. 修改 instance.properties
    5.4.3. 另外一臺canal server配置
    5.4.4. 啟動Zookeeper服務
    5.4.5. 啟動canal服務(兩個canal同時啟動)
    5.4.6. 客戶端鏈接消費數據
  • MySQL+Canal+Kafka集成開發
    6.1. 機器準備
    6.2. 下載canal
    6.3. 解壓縮
    6.4. 修改配置文件
    6.4.1. 修改instance.properties
    6.4.2. 修改canal.properties
    6.5. 啟動相關服務
    6.5.1. 啟動zookeeper服務
    6.5.2. 啟動Kafka服務
    6.5.3. 打開Kafka消費者
    6.5.4. 啟動Canal服務
    6.5.5. 觀察Kafka消費者
  • 1.Canal安裝部署

    1.1.服務器準備

    IP主機名系統組件
    192.168.106.103node1CentOS Linux release 7.4.1708 (Core)Zookeeper,kafka (master),canal單集
    192.168.106.104node2CentOS Linux release 7.4.1708 (Core)Zookeeper,kafka (slave),canal-ha(master)
    192.168.106.105node3CentOS Linux release 7.4.1708 (Core)Zookeeper,kafka (slave),canal-ha(slave)

    1.2.設置主機名并配置hosts

    四臺機器分別執行:vim /etc/hostname ,分別修改為:node1,node2,node3

    然后配置hosts,具體內容如下:

    [root@node1 ~]# vim /etc/hosts192.168.106.103 node1 192.168.106.104 node2 192.168.106.105 node3

    1.3.免密設置

    四臺機器上分別執行:

    ssh-keygen -t rsa ssh-copy-id node1 ssh-copy-id node2 ssh-copy-id node3

    1.4.設置ntp時間

    參考文檔:https://blog.csdn.net/tototuzuoquan/article/details/108900206

    1.5.關閉防火墻

    systemctl status firewalld.service # 查看防火墻的狀態 systemctl stop firewalld.service # 關閉防火墻 systemctl disable firewalld.service # 設置開機不啟動 systemctl is-enabled firewalld.service # 查看防火墻服務是否設置開機啟動

    1.6.關閉selinux

    https://www.linuxidc.com/Linux/2016-11/137723.htm

    1.7.安裝JDK

    四臺機器中解壓jdk,然后配置環境變量,例如:

    export JAVA_HOME=/root/jdk1.8.0_161 export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib/rt.jar export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

    然后在每臺機器上執行:source /etc/profile

    1.8.安裝zookeeper

    參考文章:https://blog.csdn.net/tototuzuoquan/article/details/54003140
    其中zoo.cfg的內容如下:

    # The number of milliseconds of each tick tickTime=5000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/root/apache-zookeeper-3.6.2-bin/data dataLogDir=/root/apache-zookeeper-3.6.2-bin/log # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1## Metrics Providers # # https://prometheus.io Metrics Exporter #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider #metricsProvider.httpPort=7000 #metricsProvider.exportJvmInfo=trueserver.1=node1:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888

    將上面zookeeper遠程拷貝到node1、node2、node3上。

    進入node1、node2、node3的/root/apache-zookeeper-3.6.2-bin/data,分別執行:

    echo 1 > myid # node1上執行 echo 2 > myid # node2上執行 echo 3 > myid # node3上執行

    然后分別進入node1、node2、node3上執行:

    # 啟動zk $ZOOKEEPER_HOME/bin/zkServer.sh start # 查看zk的狀態 $ZOOKEEPER_HOME/bin/zkServer.sh status

    1.9.安裝scala

    此部分略。

    配置環境變量

    export SCALA_HOME=/root/scala-2.12.12 export PATH=$PATH:$SCALA_HOME/bin

    2.安裝Kafka

    2.1.解壓

    使用如下命令,解壓kafka安裝包:

    tar -zxvf kafka_2.12-2.6.0.tgz

    刪除Kafka安裝包:

    rm -rf kafka_2.12-2.6.0.tgz

    2.2.配置環境變量

    環境變量如下:

    export SCALA_HOME=/root/scala-2.12.12 export PATH=$PATH:$SCALA_HOME/binexport KAFKA_HOME=/root/kafka_2.12-2.6.0 export PATH=$PATH:$KAFKA_HOME/bin

    然后執行:source /etc/profile

    2.3.修改配置文件

    cd $KAFKA_HOME/config1、修改zookeeper.properties文件 [root@node1 config]# vim zookeeper.properties # ZooKeeper數據存儲路徑與Zookeeper配置文件保持一致 dataDir=/root/apache-zookeeper-3.6.2-bin/data2、修改consumer.properties [root@node1 config]# vim consumer.properties # 配置 Zookeeper 集群連接地址 zookeeper.connect=node1:2181,node2:2181,node3:21813 修改producer.properties [root@node1 config]# vim producer.properties # 修改kafka集群配置地址 bootstrap.servers=node1:9092,node2:9092,node3:90924 修改server.properties [root@node1 config]# vim server.properties # 配置ZooKeeper集群地址 zookeeper.connect=node1:2181,node2:2181,node3:2181 # 存儲日志文件目錄 log.dirs=/tmp/kafka-logs # 這個路徑可以修改將kafka等同步到各機器節點(在node1節點上執行) [root@node1 ~]# scp -r kafka_2.12-2.6.0 root@node2:$PWD [root@node1 ~]# scp -r kafka_2.12-2.6.0 root@node3:$PWD

    2.4.再次修改server.properties

    在各個節點分別修改server.properties

    # 修改node1節點 broker.id=1 #修改node2 節點 broker.id=2 #修改node3節點 broker.id=3

    2.5.創建日志目錄

    三臺機器上分別執行:

    mkdir -p /tmp/kafka-logs (這里的/tmp/kafka-logs就是上面配置的kafka的日志目錄)

    2.6.Kafka集群啟動與測試

    1、啟動zookeeper集群(3個節點上執行)

    $ZOOKEEPER_HOME/bin/zkServer.sh start

    2、啟動kafka集群

    # 啟動kafka cd $KAFKA_HOME bin/kafka-server-start.sh -daemon config/server.properties

    3、查看topic列表

    [root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list

    4、創建topic

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 2

    然后在看topic列表

    [root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list test

    5、查看topic詳情

    [root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test test

    2.7.topic數據發送與消費

    1.新api使用
    node2使用自帶腳本消費topic數據

    [root@node2 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

    node1使用自帶腳本向topic發送數據

    [root@node1 kafka_2.12-2.6.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    node3使用自帶腳本消費topic數據(此時消費最新數據)

    [root@node3 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

    node3使用自帶腳本消費topic數據(從頭消費數據)

    [root@node2 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 輸入數據 adfasdasfd 輸入測試3 shuru 輸入測試2

    查看消費數據,必須要指定組。查看kafka組使用以下命令

    [root@node2 kafka_2.12-2.6.0]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list console-consumer-21382

    查看topic每個partition數據消費情況

    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group xxx --describebin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group xxx

    參數說明:
    Group 消費者組
    TOPIC:曾經消費或正在消費的 topic
    PARTITION:分區編號
    CURRENT-OFFSET:consumer group 最后一次提交的 offset
    LOG-END-OFFSET: 最后提交的生產消息 offset
    LAG:消費 offset 與生產 offset 之間的差值
    CONSUMER-ID:當前消費 topic-partition 的 group 成員 id
    HOST:消費節點的 ip 地址
    CLIENT-ID:客戶端 id

    2.8.Kafka集群監控–KafkaOffsetMonitor(老的方式)

    KafkaOffsetMonitor 是一個可以用于監控 Kafka 的 Topic 及 Consumer 消費狀況的工具。以程
    序一個 jar 包的形式運行,部署較為方便。只有監控功能,使用起來也較為安全。
    作用:
    1)監控 Kafka 集群狀態,Topic、Consumer Group 列表。
    2)圖形化展示 topic 和 Consumer 之間的關系。
    3)圖形化展示 Consumer 的 offset、Lag 等信息。
    1.下載
    下載地址:https://github.com/quantifind/KafkaOffsetMonitor(可以使用已經修改版本)
    目前 kafka Monitor 必須使用舊 api 才可以監控到,新 api 目前還沒有實現。
    2.腳本參數格式

    java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk node1:2181,node2:2181,node3:2181 \ --port 8090 \ --refresh 10.seconds \ --retain 2.days

    zk: Zookeeper 集群地址
    port: 為開啟 web 界面的端口號
    refresh: 刷新時間
    retain: 數據保留時間(單位 seconds, minutes, hours, days) 3.開發 kafkamonitor.sh 執行腳本
    vi kafkamonitor.sh

    #!/bin/sh home=$(cd `dirname $0`; cd ..; pwd) . ${home}/bin/common.sh java -cp ${lib_home}KafkaOffsetMonitor-assembly-0.2.0.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk node1:2181,node2:2181,node3:2181 \ --port 8090 \ --refresh 10.seconds \ --retain 2.days > /dev/null 2>&1 &

    4.腳本授權
    給腳本 kafkamonitor.sh 賦予可執行權限

    chmod u+x kafkamonitor.sh

    5.啟動監控腳本

    bin/kafkamonitor.sh

    6.web 可視化

    node1:8090

    2.9.Kafka集群監控–KafkaCenter

    github地址: https://github.com/xaecbd/KafkaCenter,下載KafkaCenter的包。
    碼云的地址: https://gitee.com/newegg/KafkaCenter

    2.9.1.下載

    git clone https://github.com/xaecbd/KafkaCenter.git

    2.9.2.初始化

    執行:KafkaCenter-master\KafkaCenter-Core\sql\table_script.sql。

    2.9.3.編輯 application.properties屬性文件

    具體位置是:KafkaCenter/KafkaCenter-Core/src/main/resources/application.properties

    主要是修改數據庫的密碼。

    2.9.4.編譯和運行

    注意的是:確保你安裝的JDK是JDK8+

    $ git clone https://github.com/xaecbd/KafkaCenter.git (上面已經執行過了) $ cd KafkaCenter $ mvn clean package -Dmaven.test.skip=true $ cd KafkaCenter\KafkaCenter-Core\target $ java -jar KafkaCenter-Core-2.3.0-SNAPSHOT.jar

    3.安裝mysql

    3.1.卸載原來的mysql

    mysql的安裝方式可以按照https://blog.csdn.net/tototuzuoquan/article/details/104210148中的方式進行安裝。

    3.2.創建canal賬號

    mysql -uroot -p 輸入:123456內容是: mysql> create user 'canal' identified by 'canal'; Query OK, 0 rows affected (0.00 sec)mysql> grant all privileges on *.* to 'canal'@'%' identified by 'canal'; Query OK, 0 rows affected (0.00 sec)mysql> flush privileges; Query OK, 0 rows affected (0.00 sec)mysql>

    3.3.開啟Binlog寫入功能

    對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf
    中配置如下:

    [root@node1 etc]# vim /etc/my.cnf [mysqld]log-bin=mysql-bin #開啟ROW模式 binlog-format=ROW #選擇ROW模式 server_id=1 #配置MySQL replaction需要定義,不要canal的slaveID重復

    重啟mysql

    [root@node1 etc]# systemctl restart mysqld

    并創建數據庫test

    create database test default character set utf8;

    4.Canal快速安裝部署

    官網地址:https://github.com/alibaba/canal

    4.1.機器準備

    Canal服務端:node1
    MySQL地址:node1

    4.2.下載canal

    下載地址:https://github.com/alibaba/canal/releases
    主要有:

    https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz (主要是此文件) https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.example-1.1.4.tar.gz https://github.com/alibaba/canal/archive/canal-1.1.4.zip https://github.com/alibaba/canal/archive/canal-1.1.4.tar.gz

    4.3.解壓縮

    mkdir -p /root/canal tar zxvf canal.deployer-1.1.4.tar.gz -C /root/canal

    解壓完成后,進入/root/canal,可以看到如下結構:

    [root@node1 canal]# pwd /root/canal [root@node1 canal]# ls bin conf lib logs [root@node1 canal]#

    4.4.修改配置文件

    [root@node1 canal]# cd conf/example/ [root@node1 example]# ls instance.properties[root@node1 example]# vim instance.properties 內容是: ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=1234## position info需要改成自己的數據庫信息 canal.instance.master.address=node1:3306# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.instance.defaultDatabaseName=test #此處不加的時候,表示的是所有庫

    4.5.創建example的topic

    [root@node1 example]# cd $KAFKA_HOME [root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic example --replication-factor 1 --partitions 1 Created topic example.

    4.6.啟動canal服務

    cd /root/canal bin/startup.sh

    觀察canal日志

    [root@node1 canal]# cd /root/canal/logs/canal [root@node1 canal]# tail -f canal.log 2020-12-18 22:50:52.994 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2020-12-18 22:50:53.083 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2020-12-18 22:50:53.112 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server. 2020-12-18 22:50:53.192 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.106.103(192.168.106.103):11111] 2020-12-18 22:50:55.369 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

    4.7.驗證功能

    下載canal源碼,在idea中打開:canal-canal-1.1.4.zip。導入之后的效果如下:

    打開類:com.alibaba.otter.canal.example.SimpleCanalClientPermanceTest,修改ip地址為:192.168.106.103。

    4.8.準備數據庫測試數據

    向mysql節點的數據庫中導入stu.sql表數據,然后可以對stu表進行插入、刪除或者修改操作。其中stu的內容如下:

    create table `stu` (`name` varchar (60),`speciality` varchar (60) ); insert into `stu` (`name`, `speciality`) values('張三','美術'); insert into `stu` (`name`, `speciality`) values('張三','音樂'); insert into `stu` (`name`, `speciality`) values('李四','籃球'); insert into `stu` (`name`, `speciality`) values('小明','美術'); insert into `stu` (`name`, `speciality`) values('李四','美術'); insert into `stu` (`name`, `speciality`) values('小明','音樂');

    在插入數據,修改,刪除等操作后,查看數據變化。(也可以通過下面的”數據監控微服務”來查看數據)。

    4.9.ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x106d73f2, /192.168.106.1:1312 :> /192.168.106.103:11111], exception=java.nio.channels.ClosedChannelException

    在這個過程中可能出現類似上面這個問題,解決辦法是,參考:https://blog.csdn.net/woainimax/article/details/105991825 所說

    4.10.數據監控微服務

    當用戶執行數據庫的操作的時候,binlog 日志會被canal捕獲到,并解析出數據。我們就可以將解析出來的數據進行相應的邏輯處理。

    我們在這里使用的一個開源的項目,它實現了springboot與canal的集成。比原生的canal更加優雅。

    https://github.com/chenqian56131/spring-boot-starter-canal

    使用前需要將starter-canal安裝到本地倉庫

    我們可以參照它提供的canal-test,進行代碼實現。

    (1)創建工程模塊changgou_canal,pom引入依賴(注意:也可以在canal-test工程中直接寫,并把下面的依賴添加進去)

    <dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version> </dependency>

    (2)創建包com.changgou.canal ,包下創建啟動類

    @SpringBootApplication @EnableCanalClient public class CanalApplication { ?public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);} }

    (3)添加配置文件application.properties

    # 在在canal-test中,此處開始是注釋的 canal.client.instances.example.host=192.168.106.103 # 在canal-test中,此處為2181 canal.client.instances.example.port=11111 canal.client.instances.example.batchSize=1000 # canal.client.instances.example.zookeeperAddress=192.168.0.59:8080,192.168.0.59:8081 # canal.client.instances.example.clusterEnabled=true

    (4)創建com.changgou.canal.listener包,包下創建類

    @CanalEventListener public class BusinessListener {@ListenPoint(schema = "test", table = {"stu"})public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {System.err.println("監聽test庫,stu表數據的變化");rowData.getBeforeColumnsList().forEach((c) -> System.err.println("更改前數據: " + c.getName() + " :: " + c.getValue()));rowData.getAfterColumnsList().forEach((c) -> System.err.println("更改后數據: " + c.getName() + " :: " + c.getValue()));} }

    測試:啟動數據監控微服務,修改test的stu表,觀察控制臺輸出。

    執行后的效果如下:

    5.Canal Server+Canal Client HA

    Canal Server和client端的高可用方案依賴zookeer,啟動canal server和client的時候,都會向zookeeper讀取信息。Canal在zookeeper存儲的數據結構如下:

    /otter └── canal└── destinations└── example # canal 實例名稱├── 1001 # canal client 信息│ ├── cursor # 當前消費的 mysql binlog 位點│ ├── filter # binlog 過濾條件│ └── running # 當前正在運行的 canal client 服務器├── cluster # canal server 列表│ └── ip:11111 └── running # 當前正在運行的 canal server 服務器

    Canal server 和 client 啟動的時候都會去搶占 zk 對應的 running 節點, 保證只有一個server 和 client 在運行, 而 server 和 client 的高可用切換也是基于監聽 running 節點進行的。

    5.1.機器準備

    3個節點zookeeper集群:node1,node2,node3
    2個節點Canal服務端節點:node2,node3
    MySQL節點:node1

    5.2.下載canal

    此處略

    5.3.解壓縮

    [root@node2 ~]# mkdir /root/canal-ha (node2,node3)上一樣。 [root@node2 ~]# tar -zxvf canal.deployer-1.1.4.tar.gz -C canal-ha/

    在node2上執行:

    [root@node2 ~]# scp -r canal-ha root@node3:$PWD[root@node2 canal-ha]# pwd /root/canal-ha [root@node2 canal-ha]# ls bin conf lib logs [root@node2 canal-ha]#

    5.4.修改配置文件

    5.4.1.修改 canal.properties

    [root@node2 conf]# pwd /root/canal-ha/conf [root@node2 conf]# vim canal.properties # zk集群地址 canal.zkServers = node1:2181,node2:2181,node3:2181# 全局的spring配置方式的組件文件 canal.instance.global.spring.xml = classpath:spring/default-instance.xml

    備注:
    default-instance.xml 介紹:store 選擇了內存模式,其余的 parser/sink依賴的位點管理選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集群共享。

    特點:支持HA
    場景:生產環境,集群化部署。

    5.4.2.修改 instance.properties

    # Canal偽裝的mysql slave的編號,不能與mysql數據庫與其他的slave重復。 canal.instance.mysql.slaveId = 1234 (兩臺canal不能一樣)# 要監聽的數據庫的地址和端口號 canal.instance.master.address=node1:3306# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal

    5.4.3.另外一臺canal server配置

    配置同上
    注意:兩臺機器上的instance目錄的名字需要保證完全一致,HA模式是依賴于instance name進行管理,同時必須都選擇default-instance.xml配置。

    5.4.4.啟動Zookeeper服務

    此部分略

    5.4.5.啟動canal服務(兩個canal同時啟動)

    兩個節點分別執行如下命令啟動canal服務:

    bin/startup.sh

    啟動后,你可以查看logs/example/example.log,只會看到一臺機器上出現了啟動成功的日志。
    node2上可以看到:

    [root@node2 logs]# pwd /root/canal-ha/logs [root@node2 logs]# ls canal [root@node2 logs]#

    node3上可以看到:

    [root@node3 logs]# pwd /root/canal-ha/logs [root@node3 logs]# ls canal example [root@node3 logs]# cd example/ [root@node3 example]# ls example.log [root@node3 example]# tail -f example.log -n 500 2020-12-27 03:42:07.860 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-12-27 03:42:07.910 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2020-12-27 03:42:08.708 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] 2020-12-27 03:42:08.983 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-12-27 03:42:08.983 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2020-12-27 03:42:10.851 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2020-12-27 03:42:10.864 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$ 2020-12-27 03:42:10.864 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2020-12-27 03:42:10.907 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2020-12-27 03:42:11.634 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2020-12-27 03:42:11.636 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status 2020-12-27 03:42:26.160 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=3802,serverId=1,gtid=,timestamp=1608982052000] cost : 14301ms , the next step is binlog dump

    查看一下zookeeper中節點信息,也可以知道當前工作的節點。

    [root@node2 bin]# pwd /root/apache-zookeeper-3.6.2-bin/bin [root@node2 bin]# ./zkCli.sh [zk: localhost:2181(CONNECTED) 6] get /otter/canal/destinations/example/running {"active":true,"address":"192.168.106.105:11111"} [zk: localhost:2181(CONNECTED) 8] ls /otter/canal/destinations/example/cluster [192.168.106.104:11111, 192.168.106.105:11111]

    5.4.6.客戶端鏈接消費數據

    可以直接指定zookeeper地址和instance name,canal client會自動從zookeeper中的running節點,獲取當前服務的工作節點,然后與其建立鏈接(這里還是使用官方提供的例子),要修改的類是com.alibaba.otter.canal.example.ClusterCanalClientTest:

    package com.alibaba.otter.canal.example;import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors;/*** 集群模式的測試例子* * @version 1.0.4*/ public class ClusterCanalClientTest extends AbstractCanalClientTest {public ClusterCanalClientTest(String destination){super(destination);}public static void main(String args[]) {String destination = "example";// 基于固定canal server的地址,建立鏈接,其中一臺server發生crash,可以支持failover// CanalConnector connector = CanalConnectors.newClusterConnector(// Arrays.asList(new InetSocketAddress(// AddressUtils.getHostIp(),// 11111)),// "stability_test", "", "");// 基于zookeeper動態獲取canal server的地址,建立鏈接,其中一臺server發生crash,可以支持failoverCanalConnector connector = CanalConnectors.newClusterConnector("192.168.106.104:2181", destination, "canal", "canal");final ClusterCanalClientTest clientTest = new ClusterCanalClientTest(destination);clientTest.setConnector(connector);clientTest.start();Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {try {logger.info("## stop the canal client");clientTest.stop();} catch (Throwable e) {logger.warn("##something goes wrong when stopping canal:", e);} finally {logger.info("## canal client is down.");}}});} }

    輸出的內容是:

    **************************************************** * Batch Id: [1] ,count : [2] , memsize : [81] , Time : 2020-12-27 04:26:06 * Start : [mysql-bin.000001:3853:1608982052000(2020-12-26 19:27:32)] * End : [mysql-bin.000001:3903:1608982052000(2020-12-26 19:27:32)] ****************************************************----------------> binlog[mysql-bin.000001:3853] , name[test,stu] , eventType : INSERT , executeTime : 1608982052000(2020-12-26 19:27:32) , gtid : () , delay : 32314134 ms name : 小明 type=varchar(60) update=true speciality : 音樂 type=varchar(60) update=true ----------------END ----> transaction id: 730 ================> binlog[mysql-bin.000001:3903] , executeTime : 1608982052000(2020-12-26 19:27:32) , gtid : () , delay : 32314141ms

    連接成功后,canal server會記錄當前正在工作的canal client信息,比如客戶端ip,連接的端口信息等。

    [zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/1001/running {"active":true,"address":"192.168.106.1:3222","clientId":1001} [zk: localhost:2181(CONNECTED) 23] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"node1","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":4172,"serverId":1,"timestamp":1609019073000}}

    6.MySQL+Canal+Kafka集成開發

    官網地址:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

    6.1.機器準備

    Zookeeper集群:node1,node2,node3
    Kafka集群:node1,node2,node3
    MySQL節點:node1
    Canal服務端:node1

    6.2.下載canal

    下載地址:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

    6.3.解壓縮

    [root@node1 ~]# pwd /root [root@node1 ~]# mkdir canal-kafka [root@node1 ~]# tar -zxvf canal.deployer-1.1.4.tar.gz -C canal-kafka

    解壓完成后,進入/root/canal-kafka

    [root@node1 ~]# cd canal-kafka/ [root@node1 canal-kafka]# ls bin conf lib logs [root@node1 canal-kafka]#

    6.4.修改配置文件

    6.4.1.修改instance.properties

    /root/canal-kafka/conf/example/instance.properties # position info canal.instance.master.address=192.168.106.103:3306# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal# mq config canal.mq.topic=test

    6.4.2.修改canal.properties

    /root/canal-kafka/conf/canal.properties

    # tcp, kafka, RocketMQ canal.serverMode = kafka# zk集群地址 canal.zkServers = node1:2181,node2:2181,node3:2181# kafka集群地址 canal.mq.servers = node1:9092,node2:9092,node3:9092

    6.5.啟動相關服務

    6.5.1.啟動zookeeper服務

    source /etc/profile $ZOOKEEPER_HOME/bin/zkServer.sh start

    6.5.2.啟動Kafka服務

    # 啟動kafka cd $KAFKA_HOME bin/kafka-server-start.sh -daemon config/server.properties

    6.5.3.打開Kafka消費者

    查看kafka-topic列表

    cd $KAFKA_HOME[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper node1:2181 -list test [root@node1 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test

    6.5.4.啟動Canal服務

    [root@node1 ~]# cd canal-kafka/ [root@node1 canal-kafka]# bin/startup.sh cd to /root/canal-kafka/bin for workaround relative path LOG CONFIGURATION : /root/canal-kafka/bin/../conf/logback.xml canal conf : /root/canal-kafka/bin/../conf/canal.properties CLASSPATH :/root/canal-kafka/bin/../conf:/root/canal-kafka/bin/../lib/zookeeper-3.4.5.jar:/root/canal-kafka/bin/../lib/zkclient-0.10.jar:/root/canal-kafka/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-core-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-context-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/snappy-java-1.1.7.1.jar:/root/canal-kafka/bin/../lib/snakeyaml-1.19.jar:/root/canal-kafka/bin/../lib/slf4j-api-1.7.12.jar:/root/canal-kafka/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_httpserver-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_hotspot-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_common-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient-0.4.0.jar:/root/canal-kafka/bin/../lib/scala-reflect-2.11.12.jar:/root/canal-kafka/bin/../lib/scala-logging_2.11-3.8.0.jar:/root/canal-kafka/bin/../lib/scala-library-2.11.12.jar:/root/canal-kafka/bin/../lib/rocketmq-srvutil-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-remoting-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-logging-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-common-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-client-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-acl-4.5.2.jar:/root/canal-kafka/bin/../lib/protobuf-java-3.6.1.jar:/root/canal-kafka/bin/../lib/oro-2.0.8.jar:/root/canal-kafka/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/root/canal-kafka/bin/../lib/netty-all-4.1.6.Final.jar:/root/canal-kafka/bin/../lib/netty-3.2.2.Final.jar:/root/canal-kafka/bin/../lib/mysql-connector-java-5.1.47.jar:/root/canal-kafka/bin/../lib/metrics-core-2.2.0.jar:/root/canal-kafka/bin/../lib/lz4-java-1.4.1.jar:/root/canal-kafka/bin/../lib/logback-core-1.1.3.jar:/root/canal-kafka/bin/../lib/logback-classic-1.1.3.jar:/root/canal-kafka/bin/../lib/kafka-clients-1.1.1.jar:/root/canal-kafka/bin/../lib/kafka_2.11-1.1.1.jar:/root/canal-kafka/bin/../lib/jsr305-3.0.2.jar:/root/canal-kafka/bin/../lib/jopt-simple-5.0.4.jar:/root/canal-kafka/bin/../lib/jctools-core-2.1.2.jar:/root/canal-kafka/bin/../lib/jcl-over-slf4j-1.7.12.jar:/root/canal-kafka/bin/../lib/javax.annotation-api-1.3.2.jar:/root/canal-kafka/bin/../lib/jackson-databind-2.9.6.jar:/root/canal-kafka/bin/../lib/jackson-core-2.9.6.jar:/root/canal-kafka/bin/../lib/jackson-annotations-2.9.0.jar:/root/canal-kafka/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/root/canal-kafka/bin/../lib/httpcore-4.4.3.jar:/root/canal-kafka/bin/../lib/httpclient-4.5.1.jar:/root/canal-kafka/bin/../lib/h2-1.4.196.jar:/root/canal-kafka/bin/../lib/guava-18.0.jar:/root/canal-kafka/bin/../lib/fastsql-2.0.0_preview_973.jar:/root/canal-kafka/bin/../lib/fastjson-1.2.58.jar:/root/canal-kafka/bin/../lib/druid-1.1.9.jar:/root/canal-kafka/bin/../lib/disruptor-3.4.2.jar:/root/canal-kafka/bin/../lib/commons-logging-1.1.3.jar:/root/canal-kafka/bin/../lib/commons-lang3-3.4.jar:/root/canal-kafka/bin/../lib/commons-lang-2.6.jar:/root/canal-kafka/bin/../lib/commons-io-2.4.jar:/root/canal-kafka/bin/../lib/commons-compress-1.9.jar:/root/canal-kafka/bin/../lib/commons-codec-1.9.jar:/root/canal-kafka/bin/../lib/commons-cli-1.2.jar:/root/canal-kafka/bin/../lib/commons-beanutils-1.8.2.jar:/root/canal-kafka/bin/../lib/canal.store-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.sink-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.server-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.protocol-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.prometheus-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse.driver-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse.dbsync-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.meta-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.spring-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.manager-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.core-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.filter-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.deployer-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.common-1.1.4.jar:/root/canal-kafka/bin/../lib/aviator-2.2.1.jar:/root/canal-kafka/bin/../lib/aopalliance-1.0.jar:.:/root/jdk1.8.0_161/lib/dt.jar:/root/jdk1.8.0_161/lib/tools.jar:/root/jdk1.8.0_161/jre/lib/rt.jar cd to /root/canal-kafka for continue [root@node1 canal-kafka]#

    6.5.5.觀察Kafka消費者

    第一次啟動canal,如果mysql binlog有數據的話,可以直接采集到Kafka集群,打印到Kafka消費者控制臺。

    [root@node1 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test {"data":null,"database":"`kafka_center`","es":1609021630000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Dumping database structure for kafka_center\r\nCREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */","sqlType":null,"table":"","ts":1609079707068,"type":"QUERY"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Dumping structure for table kafka_center.alert_group\r\nCREATE TABLE IF NOT EXISTS `alert_group` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `threshold` int(11) DEFAULT NULL,\r\n `dispause` int(11) DEFAULT NULL,\r\n `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_date` datetime DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `disable_alerta` tinyint(1) DEFAULT 0,\r\n `enable` tinyint(1) NOT NULL DEFAULT 1,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"alert_group","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.cluster_info\r\nCREATE TABLE IF NOT EXISTS `cluster_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL,\r\n `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `enable` int(11) DEFAULT NULL,\r\n `broker_size` int(4) DEFAULT 0,\r\n `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '',\r\n `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"cluster_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.ksql_info\r\nCREATE TABLE IF NOT EXISTS `ksql_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) DEFAULT NULL,\r\n `cluster_name` varchar(255) DEFAULT NULL,\r\n `ksql_url` varchar(255) DEFAULT NULL,\r\n `ksql_serverId` varchar(255) DEFAULT NULL,\r\n `version` varchar(255) DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"ksql_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.task_info\r\nCREATE TABLE IF NOT EXISTS `task_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `partition` int(11) DEFAULT NULL,\r\n `replication` int(11) DEFAULT NULL,\r\n `message_rate` int(50) DEFAULT NULL,\r\n `ttl` int(11) DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `approved` int(11) DEFAULT NULL,\r\n `approved_id` int(11) DEFAULT NULL,\r\n `approved_time` datetime DEFAULT NULL,\r\n `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"task_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.team_info\r\nCREATE TABLE IF NOT EXISTS `team_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `alarm_group` varchar(255) COLLATE utf8_bin DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"team_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.topic_collection\r\nCREATE TABLE IF NOT EXISTS `topic_collection` (\r\n `id` int(11) unsigned NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `user_id` int(11) NOT NULL,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"topic_collection","ts":1609079707072,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.topic_info\r\nCREATE TABLE IF NOT EXISTS `topic_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `partition` int(11) DEFAULT NULL,\r\n `replication` int(11) DEFAULT NULL,\r\n `ttl` bigint(11) DEFAULT NULL,\r\n `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `file_size` bigint(20) NOT NULL DEFAULT -1,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"topic_info","ts":1609079707072,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.user_info\r\nCREATE TABLE IF NOT EXISTS `user_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `real_name` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100',\r\n `create_time` datetime DEFAULT NULL,\r\n `password` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"user_info","ts":1609079707072,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.user_team\r\nCREATE TABLE IF NOT EXISTS `user_team` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `user_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"user_team","ts":1609079707072,"type":"CREATE"}

    可以往mysql刪除、更新、插入數據,kafka消費者控制臺可以實時消費到binlog日志數據。
    往stu表中插入數據:

    insert into `stu` (`name`, `speciality`) values('田七','語文');

    觀察日志,新增的內容如下:

    {"data":[{"name":"田七","speciality":"語文"}],"database":"test","es":1609080224000,"id":2,"isDdl":false,"mysqlType":{"name":"varchar(60)","speciality":"varchar(60)"},"old":null,"pkNames":null,"sql":"","sqlType":{"name":12,"speciality":12},"table":"stu","ts":1609080224938,"type":"INSERT"}

    打個賞唄,您的支持是我堅持寫好博文的動力

    總結

    以上是生活随笔為你收集整理的mysql数据实时同步:Canal安装部署、kafka安装、zk安装、mysql安装、Canal Server+Canal Client HA,Canal+mysql+Kafka,相关验证(学习笔记)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。