日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

flink实时同步mysql_基于Canal与Flink实现数据实时增量同步(一)

發布時間:2023/12/20 数据库 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink实时同步mysql_基于Canal与Flink实现数据实时增量同步(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

點擊上方藍色字體,關注我

canal是阿里巴巴旗下的一款開源項目,純Java開發。基于數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。

準備

配置MySQL的binlog

常見的binlog命令

# 是否啟用binlog日志

show variables like 'log_bin';

# 查看binlog類型

show global variables like 'binlog_format';

# 查看詳細的日志配置信息

show global variables like '%log%';

# mysql數據存儲目錄

show variables like '%dir%';

# 查看binlog的目錄

show global variables like "%log_bin%";

# 查看當前服務器使用的biglog文件及大小

show binary logs;

# 查看最新一個binlog日志文件名稱和Position

show master status;

對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下

[mysqld]

log-bin=mysql-bin # 開啟 binlog

binlog-format=ROW # 選擇 ROW 模式

server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

授權

授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

部署canal

安裝canal

下載:https://github.com/alibaba/canal/releases

解壓縮

[kms@kms-1 softwares]$ tar -xzvf canal.deployer-1.1.4.tar.gz ?-C /opt/modules/canal/

目錄結構

drwxr-xr-x 2 root root 4096 Mar ?5 14:19 bin

drwxr-xr-x 5 root root 4096 Mar ?5 13:54 conf

drwxr-xr-x 2 root root 4096 Mar ?5 13:04 lib

drwxrwxrwx 4 root root 4096 Mar ?5 14:19 logs

配置修改

修改conf/example/instance.properties,修改內容如下:

## mysql serverId

canal.instance.mysql.slaveId = 1234

#position info,需要改成自己的數據庫信息

canal.instance.master.address = kms-1.apache.com:3306

#username/password,需要改成自己的數據庫信息

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

# mq config,kafka topic名稱

canal.mq.topic=test

修改conf/canal.properties,修改內容如下:

# 配置zookeeper地址

canal.zkServers =kms-2:2181,kms-3:2181,kms-4:2181

# 可選項: tcp(默認), kafka, RocketMQ,

canal.serverMode = kafka

# 配置kafka地址

canal.mq.servers = kms-2:9092,kms-3:9092,kms-4:9092

啟動canal

sh bin/startup.sh

關閉canal

sh bin/stop.sh

部署Canal Admin(可選)

canal-admin設計上是為canal提供整體配置管理、節點運維等面向運維的功能,提供相對友好的WebUI操作界面,方便更多用戶快速和安全的操作。

要求

canal-admin的限定依賴:

MySQL,用于存儲配置和節點等相關數據

canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動態運維管理接口)

安裝canal-admin

下載

https://github.com/alibaba/canal/releases

解壓縮

[kms@kms-1 softwares]$ tar -xzvf canal.admin-1.1.4.tar.gz -C /opt/modules/canal-admin/

目錄結構

drwxrwxr-x 2 kms kms 4096 Mar 6 11:25 bin

drwxrwxr-x 3 kms kms 4096 Mar 6 11:25 conf

drwxrwxr-x 2 kms kms 4096 Mar 6 11:25 lib

drwxrwxr-x 2 kms kms 4096 Sep 2 2019 logs

配置修改

vi conf/application.yml

server:

port: 8089

spring:

jackson:

date-format: yyyy-MM-dd HH:mm:ss

time-zone: GMT+8

spring.datasource:

address: kms-1:3306

database: canal_manager

username: canal

password: canal

driver-class-name: com.mysql.jdbc.Driver

url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false

hikari:

maximum-pool-size: 30

minimum-idle: 1

canal:

adminUser: admin

adminPasswd: admin

初始化原數據庫

mysql -uroot -p

# 導入初始化SQL

#注:(1)初始化SQL腳本里會默認創建canal_manager的數據庫,建議使用root等有超級權限的賬號進行初始化

# (2)canal_manager.sql默認會在conf目錄下

> mysql> source /opt/modules/canal-admin/conf/canal_manager.sql

啟動canal-admin

sh bin/startup.sh

訪問

可以通過http://kms-1:8089/訪問,默認密碼:admin/123456

canal-server端配置

使用canal_local.properties的配置覆蓋canal.properties,將下面配置內容配置在canal_local.properties文件里面,就可以了。

# register ip

canal.register.ip =

# canal admin config

canal.admin.manager = 127.0.0.1:8089

canal.admin.port = 11110

canal.admin.user = admin

canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

# admin auto register

canal.admin.register.auto = true

canal.admin.register.cluster =

啟動canal-serve

sh bin/startup.sh local

注意:先啟canal-server,然后再啟動canal-admin,之后登陸canal-admin就可以添加serve和instance了。

啟動kafka控制臺消費者測試

bin/kafka-console-consumer.sh --bootstrap-server kms-2:9092,kms-3:9092,kms-4:9092 --topic test --from-beginning

此時MySQL數據表若有變化,會將row類型的log寫進Kakfa,具體格式為JSON:

insert操作

{

"data":[

{

"id":"338",

"city":"成都",

"province":"四川省"

}

],

"database":"qfbap_ods",

"es":1583394964000,

"id":2,

"isDdl":false,

"mysqlType":{

"id":"int(11)",

"city":"varchar(256)",

"province":"varchar(256)"

},

"old":null,

"pkNames":[

"id"

],

"sql":"",

"sqlType":{

"id":4,

"city":12,

"province":12

},

"table":"code_city",

"ts":1583394964361,

"type":"INSERT"

}

update操作

{

"data":[

{

"id":"338",

"city":"綿陽市",

"province":"四川省"

}

],

"database":"qfbap_ods",

"es":1583395177000,

"id":3,

"isDdl":false,

"mysqlType":{

"id":"int(11)",

"city":"varchar(256)",

"province":"varchar(256)"

},

"old":[

{

"city":"成都"

}

],

"pkNames":[

"id"

],

"sql":"",

"sqlType":{

"id":4,

"city":12,

"province":12

},

"table":"code_city",

"ts":1583395177408,

"type":"UPDATE"

}

delete操作

{

"data":[

{

"id":"338",

"city":"綿陽市",

"province":"四川省"

}

],

"database":"qfbap_ods",

"es":1583395333000,

"id":4,

"isDdl":false,

"mysqlType":{

"id":"int(11)",

"city":"varchar(256)",

"province":"varchar(256)"

},

"old":null,

"pkNames":[

"id"

],

"sql":"",

"sqlType":{

"id":4,

"city":12,

"province":12

},

"table":"code_city",

"ts":1583395333208,

"type":"DELETE"

}

JSON日志格式解釋

data:最新的數據,為JSON數組,如果是插入則表示最新插入的數據,如果是更新,則表示更新后的最新數據,如果是刪除,則表示被刪除的數據

database:數據庫名稱

es:事件時間,13位的時間戳

id:事件操作的序列號,1,2,3...

isDdl:是否是DDL操作

mysqlType:字段類型

old:舊數據

pkNames:主鍵名稱

sql:SQL語句

sqlType:是經過canal轉換處理的,比如unsigned int會被轉化為Long,unsigned long會被轉換為BigDecimal

table:表名

ts:日志時間

type:操作類型,比如DELETE,UPDATE,INSERT

小結

本文首先介紹了MySQL binlog日志的配置以及Canal的搭建,然后描述了通過canal數據傳輸到Kafka的配置,最后對canal解析之后的JSON數據進行了詳細解釋。本文是基于Canal與Flink實現數據實時增量同步的第一篇,在下一篇介紹如何使用Flink實現實時增量數據同步。

總結

以上是生活随笔為你收集整理的flink实时同步mysql_基于Canal与Flink实现数据实时增量同步(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。