日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

mysql hadoop架构,Debezium实现Mysql到Elasticsearch高效实时同步

發(fā)布時間:2025/3/15 数据库 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mysql hadoop架构,Debezium实现Mysql到Elasticsearch高效实时同步 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

問題導(dǎo)讀

1. Debezium有哪些特點?

2.Debezium如何實現(xiàn)Mysql到ES增刪改實時同步?

3.如何配置connector連接器?

題記

來自Elasticsearch中文社區(qū)的問題——

MySQL中表無唯一遞增字段,也無唯一遞增時間字段,該怎么使用logstash實現(xiàn)MySQL實時增量導(dǎo)數(shù)據(jù)到es中?復(fù)制代碼logstash和kafka_connector都僅支持基于自增id或者時間戳更新的方式增量同步數(shù)據(jù)。回到問題本身:如果庫表里沒有相關(guān)字段,該如何處理呢?

本文給出相關(guān)探討和解決方案。

1、 binlog認(rèn)知

1.1 啥是 binlog?

binlog是Mysql sever層維護的一種二進制日志,與innodb引擎中的redo/undo log是完全不同的日志;

其主要是用來記錄對mysql數(shù)據(jù)更新或潛在發(fā)生更新的SQL語句,并以"事務(wù)"的形式保存在磁盤中;作用主要有:

1)復(fù)制:達(dá)到master-slave數(shù)據(jù)一致的目的。

2)數(shù)據(jù)恢復(fù):通過mysqlbinlog工具恢復(fù)數(shù)據(jù)。

3)增量備份。

1.2 阿里的Canal實現(xiàn)了增量Mysql同步

a01.jpg (24.4 KB, 下載次數(shù): 1)

2021-3-4 10:51 上傳一圖勝千言,canal是用java開發(fā)的基于數(shù)據(jù)庫增量日志解析、提供增量數(shù)據(jù)訂閱&消費的中間件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用來處理獲得的相關(guān)數(shù)據(jù)。目的:增量數(shù)據(jù)訂閱&消費。綜上,使用binlog可以突破logstash或者kafka-connector沒有自增id或者沒有時間戳字段的限制,實現(xiàn)增量同步。

2、基于binlog的同步方式

1)基于kafka Connect的Debezium 開源工程,地址:. https://debezium.io/

2)不依賴第三方的獨立應(yīng)用: Maxwell開源項目,地址:http://maxwells-daemon.io/

由于已經(jīng)部署過conluent(kafka的企業(yè)版本,自帶zookeeper、kafka、ksql、kafka-connector等),本文僅針對Debezium展開。

3、Debezium介紹

Debezium是捕獲數(shù)據(jù)實時動態(tài)變化的開源的分布式同步平臺。能實時捕獲到數(shù)據(jù)源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、刪除(deletes)操作,實時同步到Kafka,穩(wěn)定性強且速度非常快。

特點:

1)簡單。無需修改應(yīng)用程序。可對外提供服務(wù)。

2)穩(wěn)定。持續(xù)跟蹤每一行的每一處變動。

3)快速。構(gòu)建于kafka之上,可擴展,經(jīng)官方驗證可處理大容量的數(shù)據(jù)。

4、同步架構(gòu)

a02.png (33.92 KB, 下載次數(shù): 1)

2021-3-4 10:52 上傳如圖,Mysql到ES的同步策略,采取“曲線救國”機制。

步驟1: 基Debezium的binlog機制,將Mysql數(shù)據(jù)同步到Kafka。

步驟2: 基于Kafka_connector機制,將kafka數(shù)據(jù)同步到Elasticsearch。

5、Debezium實現(xiàn)Mysql到ES增刪改實時同步

軟件版本:confluent:5.1.2;

Debezium:0.9.2_Final;

Mysql:5.7.x.

Elasticsearch:6.6.1復(fù)制代碼5.1 Debezium安裝

confluent的安裝部署參見:http://t.cn/Ef5poZk,不再贅述。

Debezium的安裝只需要把debezium-connector-mysql的壓縮包解壓放到Confluent的解壓后的插件目錄(share/java)中。MySQL Connector plugin 壓縮包的下載地址:https://debezium.io/docs/install/。注意重啟一下confluent,以使得Debezium生效。

5.2 Mysql binlog等相關(guān)配置。

Debezium使用MySQL的binlog機制實現(xiàn)數(shù)據(jù)動態(tài)變化監(jiān)測,所以需要Mysql提前配置binlog。核心配置如下,在Mysql機器的/etc/my.cnf的mysqld下添加如下配置。[mysqld]

server-id? ?? ?? ?= 223344

log_bin? ?? ?? ???= mysql-bin

binlog_format? ???= row

binlog_row_image??= full

expire_logs_days??= 10復(fù)制代碼

然后,重啟一下Mysql以使得binlog生效。systemctl start mysqld.service復(fù)制代碼

5.3 配置connector連接器。

配置confluent路徑目錄 : /etc

創(chuàng)建文件夾命令 :mkdir kafka-connect-debezium復(fù)制代碼在mysql2kafka_debezium.json存放connector的配置信息 :[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json

{

"name" : "debezium-mysql-source-0223",

"config":

{

"connector.class" : "io.debezium.connector.mysql.MySqlConnector",

"database.hostname" : "192.168.1.22",

"database.port" : "3306",

"database.user" : "root",

"database.password" : "XXXXXX",

"database.whitelist" : "kafka_base_db",

"table.whitlelist" : "accounts",

"database.server.id" : "223344",

"database.server.name" : "full",

"database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",

"database.history.kafka.topic" : "account_topic",

"include.schema.changes" : "true" ,

"incrementing.column.name" : "id",

"database.history.skip.unparseable.ddl" : "true",

"transforms": "unwrap,changetopic",

"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",

"transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",

"transforms.changetopic.regex":"(.*)",

"transforms.changetopic.replacement":"$1-smt"

}

}復(fù)制代碼注意如下配置:

“database.server.id”,對應(yīng)Mysql中的server-id的配置。

“database.whitelist” : 待同步的Mysql數(shù)據(jù)庫名。

“table.whitlelist” :待同步的Mysq表名。

重要:“database.history.kafka.topic”:存儲數(shù)據(jù)庫的Shcema的記錄信息,而非寫入數(shù)據(jù)的topic、

“database.server.name”:邏輯名稱,每個connector確保唯一,作為寫入數(shù)據(jù)的kafka topic的前綴名稱。

坑一:transforms相關(guān)5行配置作用是寫入數(shù)據(jù)格式轉(zhuǎn)換。

如果沒有,輸入數(shù)據(jù)會包含:before、after記錄修改前對比信息以及元數(shù)據(jù)信息(source,op,ts_ms等)。這些信息在后續(xù)數(shù)據(jù)寫入Elasticsearch是不需要的。(注意結(jié)合自己業(yè)務(wù)場景)。格式轉(zhuǎn)換相關(guān)原理:https://www.confluent.io/blog/si ... thereabouts-part-3/

5.4 啟動connectorcurl -X POST -H "Content-Type:application/json"

--data @mysql2kafka_debezium.json.json

http://192.168.1.22:18083/connectors | jq復(fù)制代碼

5.5 驗證寫入是否成功。

查看kafka-topickafka-topics --list --zookeeper localhost:2181復(fù)制代碼

此處會看到寫入數(shù)據(jù)topic的信息。注意新寫入數(shù)據(jù)topic的格式:database.schema.table-smt 三部分組成。本示例topic名稱:full.kafka_base_db.account-smt。

消費數(shù)據(jù)驗證寫入是否正常

./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

至此,Debezium實現(xiàn)mysql同步kafka完成。

6、kafka-connector實現(xiàn)kafka同步Elasticsearch

6.1、Kafka-connector介紹

見官網(wǎng):https://docs.confluent.io/current/connect.html

Kafka Connect是一個用于連接Kafka與外部系統(tǒng)(如數(shù)據(jù)庫,鍵值存儲,檢索系統(tǒng)索引和文件系統(tǒng))的框架。連接器實現(xiàn)公共數(shù)據(jù)源數(shù)據(jù)(如Mysql、Mongo、Pgsql等)寫入Kafka,或者Kafka數(shù)據(jù)寫入目標(biāo)數(shù)據(jù)庫,也可以自己開發(fā)連接器。

6.2、kafka到ES connector同步配置

配置路徑:/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties復(fù)制代碼

配置內(nèi)容:"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"tasks.max": "1",

"topics": "full.kafka_base_db.account-smt",

"key.ignore": "true",

"connection.url": "http://192.168.1.22:9200",

"type.name": "_doc",

"name": "elasticsearch-sink-test"復(fù)制代碼6.3 kafka到ES啟動connector

啟動命令confluent load??elasticsearch-sink-test

-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties復(fù)制代碼

6.4 Kafka-connctor RESTFul API查看

Mysql2kafka,kafka2ES的connector詳情信息可以借助postman或者瀏覽器或者命令行查看。curl -X GET http://localhost:8083/connectors復(fù)制代碼

7、坑復(fù)盤。

坑2: 同步的過程中可能出現(xiàn)錯誤,比如:kafka topic沒法消費到數(shù)據(jù)。排解思路如下:

1)確認(rèn)消費的topic是否是寫入數(shù)據(jù)的topic;

2)確認(rèn)同步的過程中沒有出錯。可以借助connector如下命令查看。curl -X GET http://localhost:8083/connectors-xxx/status復(fù)制代碼

坑3: Mysql2ES出現(xiàn)日期格式不能識別。

是Mysql jar包的問題,解決方案:在my.cnf中配置時區(qū)信息即可。

坑4: kafka2ES,ES沒有寫入數(shù)據(jù)。

排解思路:

1)建議:先創(chuàng)建同topic名稱一致的索引,注意:Mapping靜態(tài)自定義,不要動態(tài)識別生成。

2)通過connetor/status排查出錯原因,一步步分析。

最新經(jīng)典文章,歡迎關(guān)注公眾號

原文鏈接:https://blog.csdn.net/laoyang360/article/details/87897886

總結(jié)

以上是生活随笔為你收集整理的mysql hadoop架构,Debezium实现Mysql到Elasticsearch高效实时同步的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。