日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

数仓实时数据同步 debezium

發布時間:2024/3/13 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数仓实时数据同步 debezium 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

數倉實時數據同步 debezium

  • 背景
  • debezium 簡介
    • 架構
    • 基本概念
  • 例子
  • Router
  • 目前遇到的問題

背景

數據湖將源庫的數據同步到hive數倉ods層,或直接在kafka中用于后面計算。源庫包括mysql、postgresql、sqlserver、oracle,大部分是mysql數據庫。當前采用的sqoop T+1全量或增量抽取的方式,時效性低,delete的數據可能無法被正確處理。

選擇debezium的原因:數據源支持眾多,使用的組件僅僅是kafka,需要進行的開發少;debezium使用kafka-connect,而且kafka 2.3版本以后 增加或修改一個任務、整個kafka-connect集群都會rebalance的情況得到優化;類似binlog的位點存儲在kafka中,不再需要引入額外的存儲也不需要關心位點;能保證at-least-once。

debezium 簡介

架構

debezium 主要是一個kafka-connect的各種數據源同步的一種source實現。
數據存儲在kafka中

基本概念

kafka-connect 獨立于kafka的服務,本項目中采用集群的部署方式,依賴kafka實現協調。

connector 針對一個連接實例,模仿從庫從主庫獲取實時binlog。 可支持mysql postgresql oracle sqlserver mongoDb 等多種數據源。connector運行在kafka-connect中

task 每個同步源數據的connector,只采用一個task(其他任務可以采用多可來保證高可用和提高并發),task是connector任務執行的最小單位。運行在kafka-connect中,作為一個線程。

例子

同步mysql binlog,檢查源庫的binlog設置,

--是否開啟binlog SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; --是否開啟行模式 row SELECT variable_value as "binlog_format" FROM information_schema.global_variables WHERE variable_name='binlog_format'; --是否補全所有字段 full SELECT variable_value as "binlog_row_image" FROM information_schema.global_variables WHERE variable_name='binlog_row_image'; #新建一個mysql的同步任務 #${connectIp} #${name} connector名稱 #${ip} mysql實例 #${port} #${user} #${password} #${serverId} 模擬從庫的servcer.id #${serverName} 此名稱會出現在topic中 ${serverName}.${schema}.${table} #${tableList} db1.table1,db2.table1,db2.table2 #${kafka} ip:9092,ip:9092 如果服務器較多隨機寫三個足夠 #${historyTopic} 存儲ddl的topic名稱,debezium內部使用curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" ${connectIp}:8083/connectors/ \ -d '{ "name": "${name}", "config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"${ip}","database.port":"${port}","database.user":"${user}","database.password":"${password}","database.server.id":"${serverId}","database.server.name":"${serverName}","table.whitelist":"${tableList}","database.history.kafka.bootstrap.servers":"${kafka}","snapshot.mode":"schema_only","tombstones.on.delete":"false","database.history.kafka.topic":"${historyTopic}","database.history.skip.unparseable.ddl":"true","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter.schemas.enable":"false","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones":"false","transforms.unwrap.delete.handling.mode":"rewrite","transforms.unwrap.add.fields":"source.ts_ms"} }'

主要參數,還有很多其他的參數未列出

"name": "${name}","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "${ip}","database.port": "${port}","database.user": "${user}","database.password": "${password}","database.server.id": "${serverId}","database.server.name": "${serverName}","table.whitelist": "${tableList}","database.history.kafka.bootstrap.servers": "${kafka}","snapshot.mode": "schema_only", //選擇schema_only,從當前最新的位點開始同步,不需要冷數據,冷數據用其他方式抽取"tombstones.on.delete": "false","database.history.kafka.topic": "${historyTopic}","database.history.skip.unparseable.ddl": "true","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable": "false","value.converter.schemas.enable": "false","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", //簡化數據結構"transforms.unwrap.drop.tombstones": "false","transforms.unwrap.delete.handling.mode": "rewrite","transforms.unwrap.add.fields": "source.ts_ms" //增加時間戳字段用于} }

kafka中的數據

{"id":1004,"first_name":"10","last_name":"1","email":"1","__source_ts_ms":1596450910000,"__deleted":"false"} //key是 {"id":1004} {"id":1004,"first_name":"11","last_name":"1","email":"1","__source_ts_ms":1596451124000,"__deleted":"false"} //key是 {"id":1004} {"id":1004,"first_name":"101","last_name":"1","email":"1","__source_ts_ms":1596606837000,"__deleted":"false"} //key是 {"id":1004} {"id":1004,"first_name":"102","last_name":"1","email":"1","__source_ts_ms":1596606992000,"__deleted":"false"} //key是 {"id":1004}

__source_ts_ms 和__deleted是配置產生的字段

Router

分表的合并,將分表數據寫到一個topic

"transforms":"Reroute,unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones":"false", "transforms.unwrap.delete.handling.mode":"rewrite","transforms.unwrap.add.fields":"source.ts_ms", "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "dbserver1\\..*", "transforms.Reroute.topic.replacement": "dbserver1.all"

結果

key的值{"id":1004,"__dbz__physicalTableIdentifier":"dbserver1.inventory.customers"}===============value的值{"id":1004,"first_name":"204","last_name":"1","email":"1","__source_ts_ms":1596773158000,"__deleted":"false"}

__dbz__physicalTableIdentifier是自動增加的一個key字段來區別表,字段名稱可以改,也可以從topic名稱中匹配獲取

目前遇到的問題

  • 需要同步的表比較多,kafka topic多,對性能影響比較大。
  • kakfa 新版本kakfa-manager等基本都不能很好支持,需要自己開發來監控和管理kafka,connect集群也要自己開發監控。
  • 數據同步后如何到hive數倉更好,hbase kudu hudi 或者直接hdfs。

總結

以上是生活随笔為你收集整理的数仓实时数据同步 debezium的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。