日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步(附视频)

發布時間:2024/3/13 数据库 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步(附视频) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

《OpenShift / RHEL / DevSecOps 匯總目錄》
說明:本文已經在OpenShift 4.10環境中驗證

文章目錄

  • 場景說明
  • 部署環境
    • 安裝CDC源和目標數據庫
      • 安裝 MySQL
      • 安裝 PostgreSQLSQL
    • 安裝 AMQ Stream 環境
      • 安裝 AMQ Stream Opeartor
      • 創建 Kafka 實例
      • 創建 KafkaConnect 用到的 Image
      • 配置 KafkaConnect
      • 配置 KafkaConnector
        • MySqlConnector
        • JdbcSinkConnector
      • 環境檢查
  • CDC 驗證
    • 數據同步
    • 添加數據
    • 更新數據
    • 刪除數據
  • 演示視頻
  • 參考

場景說明

本文使用 OpenShift 的 AMQ Steams(即企業版 Kafka)和 Redhat 主導的 CDC 開源項目 Debezium 來實現從 MySQL 到 PostgreSQL 數據庫的數據同步。

上圖中的 Kafka Connector 提供了訪問源或目標的參數, 而 Kafka Connect 為訪問源或目標的實際運行環境,該環境運行在相關容器中。

注意:本文操作需要用到 access.redhat.com 賬號,另外還需有一個鏡像 Registry 服務的賬號,本文使用的是 quay.io Registry 服務。

部署環境

首先創建一個項目

$ oc project db-cdc

安裝CDC源和目標數據庫

安裝 MySQL

  • 在 OpenShift 控制臺的 “開發者” 視圖的 “+添加” 中找到 “數據庫”,然后點擊 MySQL (Ephemeral)。在 “實例化模板” 界面中提供如圖配置,最后點擊創建。
  • 運行以下命令,在 MySQL 中創建測試數據表和記錄。
  • $ wget https://raw.githubusercontent.com/liuxiaoyu-git/debezium_openshift/master/mysql/inventory.sql $ MYSQL_POD=$(oc get pods --output=jsonpath={.items..metadata.name} -l name=mysql) $ oc cp inventory.sql $MYSQL_POD:/tmp/ $ oc exec $MYSQL_POD -- sh -c 'mysql -uroot < /tmp/inventory.sql' $ oc exec $MYSQL_POD -it -- mysql -u mysqluser -pmysqlpassword inventory mysql> select * from customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+

    安裝 PostgreSQLSQL

  • 在 OpenShift 控制臺的 “開發者” 視圖的 “+添加” 中找到 “數據庫”,然后點擊 PostgreSQL (Ephemeral)。在 “實例化模板” 界面中提供如圖配置,最后點擊創建。
  • 運行以下命令,確認當前在 PostgreSQL 中無 customers 表。
  • $ POSTGRESQL_POD=$(oc get pod -l name=postgresql -o jsonpath={.items[0].metadata.name}) $ oc exec $POSTGRESQL_POD -it -- psql -U postgresuser inventory inventory=> select * from customers; ERROR 1146 (42S02): Table 'inventory.customers1' doesn't exist

    安裝 AMQ Stream 環境

    安裝 AMQ Stream Opeartor

    在 OpenShift 中使用默認配置安裝 AMQ Stream Opeartor,步驟略。

    創建 Kafka 實例

    在安裝好的 AMQ Stream Opeartor 中根據以下配置創建 kafka 服務。

    apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata:name: my-cluster spec:kafka:config:offsets.topic.replication.factor: 3transaction.state.log.replication.factor: 3transaction.state.log.min.isr: 2default.replication.factor: 3min.insync.replicas: 2inter.broker.protocol.version: '3.1'storage:type: ephemerallisteners:- name: plainport: 9092type: internaltls: false- name: tlsport: 9093type: internaltls: trueversion: 3.1.0replicas: 3entityOperator:topicOperator: {}userOperator: {}zookeeper:storage:type: ephemeralreplicas: 3

    創建后會在 OpenShift 中看到部署的相關資源。

    創建 KafkaConnect 用到的 Image

  • 登錄 registry.redhat.io。
  • $ podman login registry.redhat.io
  • 使用以下內容創建名為 Dockerfile 的文件,其中包含訪問 MySQL 日志的 Connect Plugin、訪問 PostgreSQL 和 Kafka 的 JDBC Driver。
  • FROM registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0-4.1652296055USER root:rootRUN mkdir -p /opt/kafka/plugins/debeziumARG POSTGRES_VERSION=42.2.8 ARG KAFKA_JDBC_VERSION=5.3.2 ARG MYSQL_CONNECTOR_PLUGIN_VERSION=1.9.5# Deploy MySQL Connect Plugin RUN cd /opt/kafka/plugins/debezium/ &&\curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/$MYSQL_CONNECTOR_PLUGIN_VERSION.Final/debezium-connector-mysql-$MYSQL_CONNECTOR_PLUGIN_VERSION.Final-plugin.tar.gz &&\tar -xf debezium-connector-mysql-$MYSQL_CONNECTOR_PLUGIN_VERSION.Final-plugin.tar.gz &&\rm -f debezium-connector-mysql-$MYSQL_CONNECTOR_PLUGIN_VERSION.Final-plugin.tar.gz# Deploy PostgreSQL JDBC Driver RUN cd /opt/kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar# Deploy Kafka Connect JDBC RUN cd /opt/kafka/plugins/debezium/ &&\curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jarUSER 1001
  • 在 Dockerfile 文件所在目錄運行命令,在本地構建鏡像。說明:請根據鏡像 Registry 服務地址和賬號名修改命令中的鏡像名稱。
  • $ podman build -t quay.io/dawnskyliu/connect-debezium:v1 .
  • 登錄 Registry 服務器,然后將生成的本地鏡像推送到鏡像 Registry 服務器上。
  • $ podman login quay.io $ podman push quay.io/dawnskyliu/connect-debezium:v1
  • 在鏡像 Registry 服務器上確認 connect-debezium 是 Public 訪問類型的 Repository。
  • 配置 KafkaConnect

    在安裝好的 AMQ Stream Opeartor 中根據以下配置創建 KafkaConnect 對象,其中使用了前面生成的 “quay.io/dawnskyliu/connect-debezium:v1” 鏡像。

    apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata:name: my-connect-clusterannotations:strimzi.io/use-connector-resources: "true" spec:version: 3.1.0replicas: 1image: 'quay.io/dawnskyliu/connect-debezium:v1'bootstrapServers: 'my-cluster-kafka-bootstrap:9093'tls:trustedCertificates:- secretName: my-cluster-cluster-ca-certcertificate: ca.crtconfig:group.id: connect-clusteroffset.storage.topic: connect-cluster-offsetsconfig.storage.topic: connect-cluster-configsstatus.storage.topic: connect-cluster-statusconfig.storage.replication.factor: 1offset.storage.replication.factor: 1status.storage.replication.factor: 1config.storage.min.insync.replicas: 1offset.storage.min.insync.replicas: 1status.storage.min.insync.replicas: 1

    配置 KafkaConnector

    MySqlConnector

    在安裝好的 AMQ Stream Opeartor 中根據以下配置創建 KafkaConnector 對象。

    apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata:name: mysql-source-connectorlabels:strimzi.io/cluster: my-connect-cluster spec:class: io.debezium.connector.mysql.MySqlConnectortasksMax: 1config:"database.hostname": "mysql""database.ssl.mode": "disabled""database.allowPublicKeyRetrieval": "true""database.port": "3306""database.user": "debezium""database.password": "dbz""database.server.id": "1""database.server.name": "dbserver1""database.include": "inventory""database.history.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092""database.history.kafka.topic": "schema-changes.inventory""transforms": "route""transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter""transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)""transforms.route.replacement": "$3"

    JdbcSinkConnector

    在安裝好的 AMQ Stream Opeartor 中根據以下配置創建 KafkaConnector 對象。

    apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata:name: postgresql-sink-connectorlabels:strimzi.io/cluster: my-connect-cluster spec:class: io.confluent.connect.jdbc.JdbcSinkConnectortasksMax: 1config:"topics": "customers""connection.url": "jdbc:postgresql://postgresql:5432/inventory?user=postgresuser&password=postgrespw""transforms": "unwrap""transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState""transforms.unwrap.drop.tombstones": "false""auto.create": "true""insert.mode": "upsert""delete.enabled": "true""pk.fields": "id""pk.mode": "record_key"

    環境檢查

    在 AMQ Streams 的 Operator 中確認 Kafka,Kafka Connect 和 Kafka Connector 的運行狀態。


    CDC 驗證

    數據同步

    確認 customers 表和數據已經從 MySQL 同步到 PostgreSQL 中。

    $ POSTGRESQL_POD=$(oc get pod -l name=postgresql -o jsonpath={.items[0].metadata.name}) $ oc exec $POSTGRESQL_POD -it -- psql -U postgresuser inventory inventory=> select * from customers;last_name | id | first_name | email -----------+------+------------+-----------------------Thomas | 1001 | Sally | sally.thomas@acme.comBailey | 1002 | George | gbailey@foobar.comWalker | 1003 | Edward | ed@walker.comKretchmar | 1004 | Anne | annek@noanswer.org (4 rows)

    添加數據

    在 MySQL 中執行命令添加新數據,然后在 PostgreSQL 確認變化數據已同步。

    mysql> INSERT INTO customers VALUES (default,"test1","test1","test1@acme.com");

    更新數據

    在 MySQL 中執行命令更新數據,然后在 PostgreSQL 確認變化數據已同步。

    mysql> update customers set first_name='Test' where id = 1001;

    刪除數據

    在 MySQL 中執行命令刪除數據,然后在 PostgreSQL 確認變化數據已同步。

    mysql> delete from customers where first_name='test1';

    演示視頻

    視頻

    參考

    https://github.com/liuxiaoyu-git/debezium_openshift
    https://debezium.io/documentation/reference/1.9/operations/openshift.html
    https://github.com/debezium/debezium-examples/tree/main/openshift
    https://aws.amazon.com/cn/blogs/china/debezium-deep-dive/

    總結

    以上是生活随笔為你收集整理的OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步(附视频)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。