日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Canal Mysql binlog 同步至 ElasticSearch 详细介绍

發布時間:2024/8/23 数据库 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Canal Mysql binlog 同步至 ElasticSearch 详细介绍 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 數據同步ElasticSearch
    • 單表基本配置
    • 適配器映射文件詳細介紹(單表、多表映射介紹)
      • 單表映射索引示例sql
      • 單表映射索引示例sql帶函數或運算操作
      • 多表映射(一對一, 多對一)索引示例sql
      • 多表映射(一對多)索引示例sql
      • 其它類型的sql示例
      • 注意事項

本文詳細介紹Canal 配置保存 ElasticSearch

Canal從零配置使用參考:https://blog.csdn.net/zhangshenghang/article/details/120361721

數據同步ElasticSearch

我們接著在之前配置Hbase基礎上直接修改配置,實現同時同步ElasticSearch

單表基本配置

  • 1.修改啟動器配置 {canal-apapter}/conf/application.yml
server:port: 8081 logging:level:com.alibaba.otter.canal.client.adapter: DEBUGcom.alibaba.otter.canal.client.adapter.hbase: DEBUG spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null canal.conf:# tcp kafka rocketMQ rabbitMQ canal-server運行的模式,TCP模式就是直連客戶端,不經過中間件。kafka和mq是消息隊列的模式mode: tcp # flatMessage: truezookeeperHosts: syncBatchSize: 1retries: 0timeout: 1000accessKey:secretKey:consumerProperties:# canal tcp consumer 指定canal-server的地址和端口canal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts: 127.0.0.1:2181canal.tcp.batch.size: 1canal.tcp.username:canal.tcp.password:srcDataSources: # 數據源配置,從哪里獲取數據defaultDS: # 指定一個名字,在ES的配置中會用到,唯一url: jdbc:mysql://127.0.0.1:3306/test2?useUnicode=trueusername: rootpassword: *****canalAdapters:- instance: example # canal instance Name or mq topic name 指定在canal配置的實例名稱groups:- groupId: g1 outerAdapters:- name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000- name: hbaseproperties:hbase.zookeeper.quorum: sangfor.abdi.node3,sangfor.abdi.node2,sangfor.abdi.node1hbase.zookeeper.property.clientPort: 2181zookeeper.znode.parent: /hbase-unsecure- name: es7 # config目錄下的子目錄名稱hosts: 192.168.168.2:9300 # 127.0.0.1:9200 for rest modeproperties:mode: transport # or rest # # security.auth: test:123456 # only used for rest modecluster.name: my_application # - name: kudu # key: kudu # properties: # kudu.master.address: 127.0.0.1 # ',' split multi address
  • 2.ElasticSearch 表映射文件
# 指定數據源,這個值和adapter的application.yml文件中配置的srcDataSources值對應。 dataSourceKey: defaultDS # 指定canal-server中配置的某個實例的名字,不同實例對應不同業務 destination: example # 組ID ,tcp方式這里填寫空,不要填寫值,不然可能會接收不到數據 groupId: # ES的mapping(映射) esMapping:# ES索引名稱_index: testsync2# ES標示文檔的唯一標示,通常對應數據表中的主鍵ID字段_id: _id # upsert: true # pk: id # 數據表每個字段映射到表中的具體名稱,不能重復sql: "select a.id as _id, a.name,a.age,a.age_2,a.message,a.insert_time from testsync as a" # objFields: # _labels: array:; # etlCondition: "where a.c_time>={}"commitBatch: 10
  • 3 重啟服務
bin/restart.sh

寫入數據

INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now()); INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());

查看adapter日志

2021-09-20 13:53:07.279 [pool-1-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"} 2021-09-20 13:53:07.286 [pool-1-thread-1] DEBUG c.a.o.c.client.adapter.hbase.service.HbaseSyncService - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"} 2021-09-20 13:53:07.287 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"} Affected indexes: testsync2

查看ElasticSearch數據

至此寫入ElasticSearch、Hbase成功

適配器映射文件詳細介紹(單表、多表映射介紹)

${adapter}/conf/es7/xxx.yml

dataSourceKey: defaultDS # 源數據源的key, 對應上面配置的srcDataSources中的值 outerAdapterKey: exampleKey # 對應application.yml中es配置的key destination: example # cannal的instance或者MQ的topic groupId: # 對應MQ模式下的groupId, 只會同步對應groupId的數據 esMapping:_index: mytest_user # es 的索引名稱_type: _doc # es 的type名稱, es7下無需配置此項_id: _id # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配 # pk: id # 如果不需要_id, 則需要指定一個屬性為主鍵屬性# sql映射sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,a.c_time as _c_time, c.labels as _labels from user aleft join role b on b.id=a.role_idleft join (select user_id, group_concat(label order by id desc separator ';') as labels from labelgroup by user_id) c on c.user_id=a.id" # objFields: # _labels: array:; # 數組或者對象屬性, array:; 代表以;字段里面是以;分隔的 # _obj: object # json對象etlCondition: "where a.c_time>='{0}'" # etl 的條件參數commitBatch: 3000 # 提交批大小

sql映射說明:

sql支持多表關聯自由組合, 但是有一定的限制:

  • 主表不能為子查詢語句
  • 只能使用left outer join即最左表一定要是主表
  • 關聯從表如果是子查詢不能有多張表
  • 主sql中不能有where查詢條件(從表子查詢中可以有where條件但是不推薦, 可能會造成數據同步的不一致, 比如修改了where條件中的字段內容)
  • 關聯條件只允許主外鍵的’='操作不能出現其他常量判斷比如: on a.role_id=b.id and b.statues=1
  • 關聯條件必須要有一個字段出現在主查詢語句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必須出現在主select語句中
  • Elastic Search的mapping 屬性與sql的查詢值將一一對應(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name將映射到es mapping的name field, _email將 映射到mapping的_email field, 這里以別名(如果有別名)作為最終的映射字段. 這里的_id可以填寫到配置文件的 _id: _id映射.

    單表映射索引示例sql

    select a.id as _id, a.name, a.role_id, a.c_time from user a

    該sql對應的es mapping示例:

    {"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"c_time": {"type": "date"}}}}} }

    單表映射索引示例sql帶函數或運算操作

    select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a

    函數字段后必須跟上別名, 該sql對應的es mapping示例:

    {"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"c_time": {"type": "date"}}}}} }

    多表映射(一對一, 多對一)索引示例sql

    select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a left join role b on b.id = a.role_id

    注:這里join操作只能是left outer join, 第一張表必須為主表!!
    該sql對應的es mapping示例:

    {"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"role_name": {"type": "text"},"c_time": {"type": "date"}}}}} }

    多表映射(一對多)索引示例sql

    select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a left join (select user_id, group_concat(label order by id desc separator ';') as labels from labelgroup by user_id) c on c.user_id=a.id

    注:left join 后的子查詢只允許一張表, 即子查詢中不能再包含子查詢或者關聯!!

    該sql對應的es mapping示例:

    {"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"c_time": {"type": "date"},"labels": {"type": "text"}}}}} }

    其它類型的sql示例

    • geo type
    select ... concat(IFNULL(a.latitude, 0), ',', IFNULL(a.longitude, 0)) AS location, ...
    • 復合主鍵
    select concat(a.id,'_',b.type) as _id, ... from user a left join role b on b.id=a.role_id
    • 數組字段
    select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a left join (select user_id, group_concat(label order by id desc separator ';') as labels from labelgroup by user_id) c on c.user_id=a.id

    配置中使用:

    objFields:labels: array:;
    • 對象字段
    select a.id as _id, a.name, a.role_id, c.labels, a.c_time, a.description from user a

    配置中使用:

    objFields:description: object

    其中a.description字段內容為json字符串

    • 父子文檔索引
      es/customer.yml
    ...... esMapping:_index: customer_type: _doc_id: idrelations:customer_order:name: customersql: "select t.id, t.name, t.email from customer t"

    es/order.yml

    esMapping:_index: customer_type: _doc_id: _idrelations:customer_order:name: orderparent: customer_idsql: "select concat('oid_', t.id) as _id,t.customer_id,t.id as order_id,t.serial_code as order_serial,t.c_time as order_timefrom biz_order t"skips:- customer_id

    mapping示例:

    {"mappings":{"_doc":{"properties":{"id": {"type": "long"},"name": {"type": "text"},"email": {"type": "text"},"order_id": {"type": "long"},"order_serial": {"type": "text"},"order_time": {"type": "date"},"customer_order":{"type":"join","relations":{"customer":"order"}}}}} }

    注意事項

    • 多表映射時,主表數據必須插入,如果只插入子表不插入主表,數據無法同步到ElasticSearch;相反只插入主表,子表不進行插入,數據是可以同步到ElasticSearch的
    • 多表映射時,如果主表關聯id寫入后,子表再進行修改之前的關聯的id為我們主表寫入的id,數據是無法同步到ElasticSearch中的。

    總結

    以上是生活随笔為你收集整理的Canal Mysql binlog 同步至 ElasticSearch 详细介绍的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。