日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

MySQL -> ES 数据同步 配置步骤

發(fā)布時間:2024/4/15 数据库 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MySQL -> ES 数据同步 配置步骤 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

部署 MySQL -> ES 數(shù)據(jù)同步

(mysql 同步到 ES 是支持 多表查詢 后把結果同步到ES 中的同一個索引中的)

1.服務器:

內網(wǎng)ip:192.168.0.60
登錄name+key
實現(xiàn)功能: canal、kafka、es7;canal同步預生產(chǎn)polardb數(shù)據(jù)到es7

2.ES7 kafka服務

es web 管理url:
http://[internet ip]:9800
es:
公網(wǎng): [internet ip] 9201
內網(wǎng): 192.168.0.60 9201

kafka:
kafka-manage
http://[internet ip]:9001

kafka:
公網(wǎng): [internet ip] 9092
內網(wǎng): 192.168.0.60 9092
zookeeper:
192.168.0.60 2181

##3.ES 同步相關文件目錄如下:

見文章最后

服務器部署列表:

applicationip: portinstall diruser/psd
mysql[mysql_server_ip]:3306rdscanal / 123456
zookeeper192.168.0.60:2181/opt/app/zookeeper-3.4.12#zookeeper
canal.deployer192.168.0.60:1111/opt/app/canal.deployer#canal
canal.adapter192.168.0.60:8081/opt/app/canal.adapter#canal
ES192.168.0.60:9201DOCKERes
kafka192.168.0.60:9092DOCKERkafka

1.安裝zookeeper.

配置文件: vi conf/zoo.cfg 主要參數(shù):initLimit=10syncLimit=5clientPort=2181dataDir=/opt/app/zookeeper-3.4.12/datavi conf/log4j.properties #日志類配置zookeeper.log.dir=.zookeeper.log.file=zookeeper.logzookeeper.log.threshold=DEBUGzookeeper.tracelog.dir=.zookeeper.tracelog.file=zookeeper_trace.log啟動zookeeper# ./zkServer.sh start ../conf/zoo.cfgZooKeeper JMX enabled by defaultUsing config: ../conf/zoo.cfgStarting zookeeper ... STARTED在zookeeper 中查看同步canal 的信息:zkCli.sh -server localhost:2181ls /otterget /otter/canal/destinations/crm_canal/2/cursor

2.安裝配置canal.deployer

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gzcanal,adapter 最好下載同版本。adapter1.1.5 插件需要更新為:client-adapter.es7x-1.1.5-jar-with-dependencies.jar 相關說明如下: canal 1.1.5本身有個Bug存在(https://github.com/alibaba/canal/issues/3636),需要手動替換一個插件mkdir /opt/app/canal.adapter /opt/app/canal.deployer tar zxvf /opt/download/canal.deployer-1.1.5.tar.gz -C /opt/app/canal.deployer tar zxvf /opt/download/canal.adapter-1.1.5.tar.gz -C /opt/app/canal.adapter

2.1 canal deployer 配置文件主要參數(shù):

vi conf/canal.properties # canal.id = 1 #如果是集群,編號要不相同canal.ip = 192.168.0.60 #本地IPcanal.port = 11111 #端口canal.metrics.pull.port = 11112canal.zkServers = 192.168.0.60 #zookeeper 服務器,這里是本地canal.serverMode = kafka #同步方式,kafka 數(shù)據(jù)流的方式canal.destinations = shop #同步后的目的地名稱(在mq中可以查看到)kafka.bootstrap.servers = 192.168.0.60:9092

2.1 canal instance 配置文件主要參數(shù):

cd /opt/app/canal.deplyer/conf
cp example/instance.properties ./shop/
vi shop/instance.properties

# enable gtid use true/falsecanal.instance.gtidon=true# position infocanal.instance.master.address=192.168.0.25:3306canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp=canal.instance.master.gtid=truecanal.instance.dbUsername=canalcanal.instance.dbPassword=123456canal.instance.connectionCharset = UTF-8# mq configcanal.mq.topic=SYNC_ES_shop# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.partition=0

mysql數(shù)據(jù)庫相關配置

canal.instance.master.address=mysql_server_ip:3306 canal.instance.dbUsername=canal canal.instance.dbPassword= [密碼查看其它文件] 待同步數(shù)據(jù)表 canal.instance.filter.regex=shop.tb_building,shop.tb_article,shop.tb_home_news,shop.tb_home_store_product,shop. tb_travel_product 指定topic canal.mq.topic=SYNC_ES_SHOP

2.2 adapter 參數(shù)配置

vi conf/application.yml

canal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:127.0.0.1:2181
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

srcDataSources:defaultDS:url: jdbc:mysql://mysql_server_ip:3306/mytest?useUnicode=trueusername: sync_user password: 123456canalAdapters:- instance: SYNC_ES_shop # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: es7key: exampleKey#hosts: localhost:9201 # 127.0.0.1:9200 for rest mode ~~#**注意: 此方式無法訪問ES ERROR: Illegal character in scheme name at index 0: 127.0.0.1:9201****~~hosts: http://192.168.0.60:9201 # 127.0.0.1:9200 for rest modeproperties:mode: rest # or restcluster.name: docker-cluster
3.2 開啟實時同步、全量同步數(shù)據(jù)到ES

1.啟動canal-server
2.啟動canal-adapter

3.建立ES 索引

cd /opt/json/es_index_json_file/curl -XPUT -H "Content-Type: application/json" http://127.0.0.1:9201/tb_home_store_product?include_type_name=true -d "@tb_home_store_product.json"curl -XPUT -H "Content-Type: application/json" http://127.0.0.1:9201/tb_travel_product?include_type_name=true -d "@tb_travel_product.json"#查看ES中的索引curl -XGET http://127.0.0.1:9201/_cat/indices?v#刪除索引#curl -XDELETE http://127.0.0.1:9201/tb_building

4.全量同步數(shù)據(jù)到ES

curl -X POST http://127.0.0.1:8081/etl/es7/exampleKey/tb_home_store_product.yml curl -X POST http://127.0.0.1:8081/etl/es7/exampleKey/tb_travel_product.yml

ES web 查詢 頁面:

http://[internet ip]:9800/
連接內部 ESDH IP: http://192.168.0.60:9201

#以下文件內容在進行同步時,拷貝另存為對應的文件即可

****************************************** tb_home_store_product.json file ***************************************************************

tb_home_store_product.json file

{"mappings":{"home_store_product_doc":{"properties":{"id": {"type": "integer"},"store_name": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"store_info": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"content": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"mark_price": {"type": "double"},"ot_price": {"type": "double"},"sales": {"type": "long"},"ficti": {"type": "long"},"create_by": {"type": "keyword"},"create_time": {"type": "date"},"update_by": {"type": "keyword"},"update_time": {"type": "date"} } } } }

****************************************** tb_home_store_product.yml file ***************************************************************

dataSourceKey: defaultDS destination: SYNC_ES_SHOP_TEST outerAdapterKey: exampleKey groupId: g1 esMapping:_index: tb_home_store_product_id: idsql: "select id,store_name,store_info,content,mark_price,ot_price,sales,ficti,is_show,del_flag,img_size,image,create_by,create_time,update_by,update_time from tb_home_store_product t"etlCondition: "where t.id>={0} and t.id<={1}"commitBatch: 3000

****************************************** tb_travel_product.json file ***************************************************************

{"mappings":{"travel_product_doc":{"properties":{"id": {"type": "integer"},"on_sale": {"type": "integer"},"product_name": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"product_shortname": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"product_price": {"type": "double"},"product_img": {"type": "keyword"},"sale_count": {"type": "long"},"create_by": {"type": "keyword"},"create_time": {"type": "date"},"update_by": {"type": "keyword"},"update_time": {"type": "date"} } } } }

************************************************** tb_travel_product.yml file *******************************************************

###說明:這里可以看出,這個查詢是2個表join 后的結果,說明是可以2個表導出到ES 為一個索引的。

dataSourceKey: defaultDS destination: SYNC_ES_SHOP_TEST outerAdapterKey: exampleKey groupId: g1 esMapping:_index: tb_travel_product_id: idsql: "SELECT t.id as id,t.product_no, t.on_sale, t.product_name, t.product_shortname,t.product_price, i. product_img, t.sale_count, t.create_by, t.create_time, t.update_by, t.update_time from tb_travel_product t left join tb_travel_product_info i on i.product_no = t.product_no"etlCondition: "where t.id>={0} and t.id<={1}"commitBatch: 3000

總結

以上是生活随笔為你收集整理的MySQL -> ES 数据同步 配置步骤的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。