日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

ES迁mysql_使用kafka连接器迁移mysql数据到ElasticSearch

發布時間:2025/3/12 数据库 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ES迁mysql_使用kafka连接器迁移mysql数据到ElasticSearch 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概述

把 mysql 的數據遷移到 es 有很多方式,比如直接用 es 官方推薦的 logstash 工具,或者監聽 mysql 的 binlog 進行同步,可以結合一些開源的工具比如阿里的 canal。

這里打算詳細介紹另一個也是不錯的同步方案,這個方案基于 kafka 的連接器。流程可以概括為:

mysql連接器監聽數據變更,把變更數據發送到 kafka topic。

ES 監聽器監聽kafka topic 消費,寫入 ES。

Kafka Connect有兩個核心概念:Source和Sink。 Source負責導入數據到Kafka,Sink負責從Kafka導出數據,它們都被稱為Connector,也就是連接器。在本例中,mysql的連接器是source,es的連接器是sink。

這些連接器本身已經開源,我們之間拿來用即可。不需要再造輪子。

過程詳解

準備連接器工具

我下面所有的操作都是在自己的mac上進行的。

首先我們準備兩個連接器,分別是 kafka-connect-elasticsearch 和 kafka-connect-elasticsearch, 你可以通過源碼編譯他們生成jar包,源碼地址:

我個人不是很推薦這種源碼的編譯方式,因為真的好麻煩。除非想研究源碼。

我是直接下載 confluent 平臺的工具包,里面有編譯號的jar包可以直接拿來用,下載地址:

我下載的是 confluent-5.3.1 版本, 相關的jar包在 confluent-5.3.1/share/java 目錄下

我們把編譯好的或者下載的jar包拷貝到kafka的libs目錄下。拷貝的時候要注意,除了 kafka-connect-elasticsearch-5.3.1.jar 和 kafka-connect-jdbc-5.3.1.jar,相關的依賴包也要一起拷貝過來,比如es這個jar包目錄下的http相關的,jersey相關的等,否則會報各種 java.lang.NoClassDefFoundError 的錯誤。

另外mysql-connector-java-5.1.22.jar也要放進去。

數據庫和ES環境準備

數據庫和es我都是在本地啟動的,這個過程具體就不說了,網上有很多參考的。

我創建了一個名為test的數據庫,里面有一個名為login的表。

配置連接器

這部分是最關鍵的,我實際操作的時候這里也是最耗時的。

首先配置jdbc的連接器。

我們從confluent工具包里拷貝一個配置文件的模板(confluent-5.3.1/share目錄下),自帶的只有sqllite的配置文件,拷貝一份到kafka的config目錄下,改名為sink-quickstart-mysql.properties,文件內容如下:

# tasks to create:

name=mysql-login-connector

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=1

connection.url=jdbc:mysql://localhost:3306/test?user=root&password=11111111

mode=timestamp+incrementing

timestamp.column.name=login_time

incrementing.column.name=id

topic.prefix=mysql.

table.whitelist=login

connection.url指定要連接的數據庫,這個根據自己的情況修改。mode指示我們想要如何查詢數據。在本例中我選擇incrementing遞增模式和timestamp 時間戳模式混合的模式, 并設置incrementing.column.name遞增列的列名和時間戳所在的列名。

混合模式還是比較推薦的,它能盡量的保證數據同步不丟失數據。具體的原因大家可以查閱相關資料,這里就不詳述了。

topic.prefix是眾多表名之前的topic的前綴,table.whitelist是白名單,表示要監聽的表,可以使組合多個表。兩個組合在一起就是該表的變更topic,比如在這個示例中,最終的topic就是mysql.login。

connector.class是具體的連接器處理類,這個不用改。

其它的配置基本不用改。

接下來就是ES的配置了。同樣也是拷貝 quickstart-elasticsearch.properties 文件到kafka的config目錄下,然后修改,我自己的環境內容如下:

name=elasticsearch-sink

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

tasks.max=1

topics=mysql.login

key.ignore=true

connection.url=http://localhost:9200

type.name=mysqldata

topics的名字和上面mysql設定的要保持一致,同時這個也是ES數據導入的索引。從里也可以看出,ES的連接器一個實例只能監聽一張表。

type.name需要關注下,我使用的ES版本是7.1,我們知道在7.x的版本中已經只有一個固定的type(_doc)了,使用低版本的連接器在同步的時候會報錯誤,我這里使用的5.3.1版本已經兼容了。繼續看下面的章節就知道了。

關于es連接器和es的兼容性問題,有興趣的可以看看下面這個issue:

啟動測試

當然首先啟動zk和kafka。

然后我們啟動mysql的連接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-mysql.properties &

接著手動往login表插入幾條記錄,正常情況下這些變更已經發到kafka對應的topic上去了。為了驗證,我們在控制臺啟動一個消費者從mysql.login主題讀取數據:

./bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning

可以看到剛才插入的數據。

把數據從 MySQL 移動到 Kafka 里就算完成了,接下來把數據從 Kafka 寫到 ElasticSearch 里。

首先啟動ES和kibana,當然后者不是必須的,只是方便我們在IDE環境里測試ES。你也可以通過控制臺給ES發送HTTP的指令。

先把之前啟動的mysql連接器進程結束(因為會占用端口),再啟動 ES 連接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/quickstart-elasticsearch.properties &

如果正常的話,ES這邊應該已經有數據了。打開kibana的開發工具,在console里執行

GET _cat/indices

這是獲取節點上所有的索引,你應該能看到,

green open mysql.login 1WqRjkbfTlmXj8eKBPvAtw 1 1 4 0 12kb 7.8kb

說明索引已經正常創建了。然后我們查詢下,

GET mysql.login/_search?pretty=true

結果如下,

{

"took" : 1,

"timed_out" : false,

"_shards" : {

"total" : 1,

"successful" : 1,

"skipped" : 0,

"failed" : 0

},

"hits" : {

"total" : {

"value" : 4,

"relation" : "eq"

},

"max_score" : 1.0,

"hits" : [

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+0",

"_score" : 1.0,

"_source" : {

"id" : 1,

"username" : "lucas1",

"login_time" : 1575870785000

}

},

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+1",

"_score" : 1.0,

"_source" : {

"id" : 2,

"username" : "lucas2",

"login_time" : 1575870813000

}

},

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+2",

"_score" : 1.0,

"_source" : {

"id" : 3,

"username" : "lucas3",

"login_time" : 1575874031000

}

},

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+3",

"_score" : 1.0,

"_source" : {

"id" : 4,

"username" : "lucas4",

"login_time" : 1575874757000

}

}

]

}

}

參考:

1.《kafka權威指南》

關注公眾號:犀牛飼養員的技術筆記

總結

以上是生活随笔為你收集整理的ES迁mysql_使用kafka连接器迁移mysql数据到ElasticSearch的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。