日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

mysql实时监听canal+kafka

發布時間:2023/12/31 数据库 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mysql实时监听canal+kafka 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.首先安裝并啟動mysql

mysql5.7版本安裝部署詳細步驟_怪只怪滿眼盡是人間煙火-CSDN博客最新版mysql下載地址:MySQL :: Download MySQL Community Server現在的服務器應該都支持64位了,下載64位即可1.首先新建文件夾,并進入該文件夾,將下載好的mysql文件上傳至此文件夾,mkdir local && cd local2.解壓及創建目錄tar -zxvf mysql-5.7.36-linux-glibc2.12-x86_64.tar.gzmv mysql-5.7.36-linux-glibc2...https://blog.csdn.net/weixin_38959210/article/details/122051301注意mysql的配置文件必須開啟日志

# 一般情況下,Window修改mysql目錄下的my.ini # 一般情況下,Linux修改/etc/mysql下的my.cnf[mysqld] # 添加的部分,server-id隨便填 server-id = 12345 log-bin = mysql-bin # 必須為ROW binlog_format = ROW # 必須為FULL,MySQL-5.7后才有該參數 binlog_row_image = FULL expire_logs_days = 10

2.然后安裝并啟動kafka

linux單機版kafka啟動命令_怪只怪滿眼盡是人間煙火-CSDN博客需要java環境,推薦1.8以上下載kafka,解壓,進入kafka的bin目錄啟動zookeeper(Kafka 0.5.x版本以上已經自帶ZooKeper)自測的話直接使用自帶的zookeeper就行,要是生產的話,建議分離。nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &啟動kafkanohup ./kafka-server-start.sh ../config/server....https://qushen.blog.csdn.net/article/details/122682000

3.安裝并配置canal
canal.deployer-1.1.5.tar.gz??????https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

配置兩個文件

/conf/canal.properties

################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name =canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ 選擇kafka canal.serverMode = kafka # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60# network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30# binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false# binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation canal.instance.get.ddl.isolation = false# parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256# table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml################################################## ######### MQ Properties ############# ################################################## # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = canal.aliyun.uid=canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = localcanal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8################################################## ######### Kafka ############# ################################################## #修改kafka地址 kafka.bootstrap.servers = 127.0.0.1:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0kafka.kerberos.enable = false kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf" kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"################################################## ######### RocketMQ ############# ################################################## rocketmq.producer.group = test rocketmq.enable.message.trace = false rocketmq.customized.trace.topic = rocketmq.namespace = rocketmq.namesrv.addr = 127.0.0.1:9876 rocketmq.retry.times.when.send.failed = 0 rocketmq.vip.channel.enabled = false rocketmq.tag = ################################################## ######### RabbitMQ ############# ################################################## rabbitmq.host = rabbitmq.virtual.host = rabbitmq.exchange = rabbitmq.username = rabbitmq.password = rabbitmq.deliveryMode =

/conf/example/instance.properties

################################################# ## mysql serverId , v1.0.26+ will autoGen 此處id不要和mysql的server-id一致即可canal.instance.mysql.slaveId=20# enable gtid use true/false canal.instance.gtidon=false# position info 數據庫地址 canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid=# rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId=# table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/cdc #canal.instance.tsdb.dbUsername=root #canal.instance.tsdb.dbPassword=123456#canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid=# username/password 數據庫賬號密碼 canal.instance.dbUsername=root canal.instance.dbPassword=123456 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config kafka主題 canal.mq.topic=test # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #################################################

啟動canal

?/bin/startup.sh

?kafka消費者可以實時監聽到數據庫變化。

以上 JSON 數據的字段解釋如下:

字段類型說明
idNumberTiCDC 默認值為 0
databaseStringRow 所在的 Database 的名字
tableStringRow 所在的 Table 的名字
pkNamesArray組成 primary key 的所有列的名字
isDdlBool該條消息是否為 DDL 事件
typeStringCanal-JSON 定義的事件類型
esNumber產生該條消息的事件發生時的 13 位(毫秒級)時間戳
tsNumberTiCDC 生成該條消息時的 13 位(毫秒級)時間戳
sqlString當 isDdl 為 true 時,記錄對應的 DDL 語句
sqlTypeObject當 isDdl 為 false 時,記錄每一列數據類型在 Java 中的類型表示
mysqlTypeobject當 isDdl 為 false 時,記錄每一列數據類型在 MySQL 中的類型表示
dataObject當 isDdl 為 false 時,記錄每一列的名字及其數據值
oldObject僅當該條消息由 Update 類型事件產生時,記錄每一列的名字,和 Update 之前的數據值
_tidbObjectTiDB 擴展字段,僅當?enable-tidb-extension?為 true 時才會存在。其中的?commitTs?值為造成 Row 變更的事務的 TSO

總結

以上是生活随笔為你收集整理的mysql实时监听canal+kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。