日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Debezium的基本使用(以MySQL为例)

發布時間:2024/3/13 数据库 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Debezium的基本使用(以MySQL为例) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
  • GreatSQL社區原創內容未經授權不得隨意使用,轉載請聯系小編并注明來源。
  • GreatSQL是MySQL的國產分支版本,使用上與MySQL一致。

    一、Debezium介紹

    摘自官網:

    Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

簡單理解就是Debezium可以捕獲數據庫中所有行級的數據變化并包裝成事件流順序輸出。

二、基本使用

下面以MySQL為例介紹Debezium的基本使用。

1. MySQL的準備工作

  • 準備一個MySQL用戶并且擁有相應權限,像這樣:CREATE USER 'dbz'@'%' IDENTIFIED BY 'dbzpwd';
  • GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'dbz' IDENTIFIED BY 'dbzpwd';

    2. 檢查MySQL是否開啟`log-bin` ```sql SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';-- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled... -- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

    如果是OFF則需要修改MySQL配置文件,類似下面這樣:

    server-id = 223344 #必須有 log_bin = mysql-bin #log_bin的值是binlog文件序列的基本名稱 binlog_format = ROW #必須是ROW binlog_row_image = FULL #必須是FULL expire_logs_days = 10 #依據實際情況而定
  • 準備數據庫&表create database inventory; create table inventory.a (id bigint primary key auto_increment, name varchar(32)); insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');
  • 2. 編寫程序

    2.1. 工程依賴(Maven)

    pom.xml

    <dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${version.debezium}</version> </dependency> <dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${version.debezium}</version> </dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${version.debezium}</version> </dependency>

    目前Debezium最新穩定版本為:1.9.5.Final

    2.2. 準備數據庫&表

    create database inventory; create table inventory.a (id bigint primary key, name varchar(32)); insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');

    2.3. 代碼編寫

    package com.greatdb.dbzdemo;import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json;/*** @author wang.jianwen* @version 1.0* @date 2022/07/29*/ public class DebeziumTest {private static DebeziumEngine<ChangeEvent<String, String>> engine;public static void main(String[] args) throws Exception {final Properties props = new Properties();props.setProperty("name", "dbz-engine");props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");//offset config begin - 使用文件來存儲已處理的binlog偏移量props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");props.setProperty("offset.flush.interval.ms", "0");//offset config endprops.setProperty("database.server.name", "mysql-connector");props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");props.setProperty("database.server.id", "122112"); //需要與MySQL的server-id不同props.setProperty("database.hostname", "tmg");props.setProperty("database.port", "3306");props.setProperty("database.user", "mysqluser");props.setProperty("database.password", "mysqlpw");props.setProperty("database.include.list", "inventory");//要捕獲的數據庫名props.setProperty("table.include.list", "inventory.a");//要捕獲的數據表props.setProperty("snapshot.mode", "initial");//全量+增量// 使用上述配置創建Debezium引擎,輸出樣式為Json字符串格式engine = DebeziumEngine.create(Json.class).using(props).notifying(record -> {System.out.println(record);//輸出到控制臺}).using((success, message, error) -> {if (error != null) {// 報錯回調System.out.println("------------error, message:" + message + "exception:" + error);}closeEngine(engine);}).build();ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);addShutdownHook(engine);awaitTermination(executor);System.out.println("------------main finished.");}private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {try {engine.close();} catch (IOException ignored) {}}private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));}private static void awaitTermination(ExecutorService executor) {if (executor != null) {try {executor.shutdown();while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}} }

    3. 測試

    程序跑起來后,可以看到控制臺輸出:

    ...(省略) EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] ...(省略)

    可以看到全量的數據已經輸出,關鍵的數據如下:

    ..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"... ..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"... ..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
    • 接下來新增一條數據:

      insert into inventory.a values (4, 'n4');

      控制臺輸出:

      ..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
    • 修改一條數據:

      update inventory.a set name = 'n4-upd' where id = 4;

      控制臺輸出:

      ..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
    • 刪除一條數據:

      delete from inventory.a where id = 1;

      控制臺輸出:

      ..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...

      三、總結

      本文以MySQL為例介紹了Debezium在代碼中基本使用流程,對MySQL的數據進行常見的增刪改操作,Debezium將捕獲這些數據行的變化,并記錄了數據行變化前后的數據,并對外提供事件流,外部可以獲取并對事件進行相應處理。

    參考:https://debezium.io/documentation/reference/1.8/index.html

    總結

    以上是生活随笔為你收集整理的Debezium的基本使用(以MySQL为例)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。