MySQL -> ES 数据同步 配置步骤
部署 MySQL -> ES 數(shù)據(jù)同步
(mysql 同步到 ES 是支持 多表查詢 后把結果同步到ES 中的同一個索引中的)
1.服務器:
內網(wǎng)ip:192.168.0.60
登錄name+key
實現(xiàn)功能: canal、kafka、es7;canal同步預生產(chǎn)polardb數(shù)據(jù)到es7
2.ES7 kafka服務
es web 管理url:
http://[internet ip]:9800
es:
公網(wǎng): [internet ip] 9201
內網(wǎng): 192.168.0.60 9201
kafka:
kafka-manage
http://[internet ip]:9001
kafka:
公網(wǎng): [internet ip] 9092
內網(wǎng): 192.168.0.60 9092
zookeeper:
192.168.0.60 2181
##3.ES 同步相關文件目錄如下:
見文章最后
服務器部署列表:
| mysql | [mysql_server_ip]:3306 | rds | canal / 123456 |
| zookeeper | 192.168.0.60:2181 | /opt/app/zookeeper-3.4.12 | #zookeeper |
| canal.deployer | 192.168.0.60:1111 | /opt/app/canal.deployer | #canal |
| canal.adapter | 192.168.0.60:8081 | /opt/app/canal.adapter | #canal |
| ES | 192.168.0.60:9201 | DOCKER | es |
| kafka | 192.168.0.60:9092 | DOCKER | kafka |
1.安裝zookeeper.
配置文件: vi conf/zoo.cfg 主要參數(shù):initLimit=10syncLimit=5clientPort=2181dataDir=/opt/app/zookeeper-3.4.12/datavi conf/log4j.properties #日志類配置zookeeper.log.dir=.zookeeper.log.file=zookeeper.logzookeeper.log.threshold=DEBUGzookeeper.tracelog.dir=.zookeeper.tracelog.file=zookeeper_trace.log啟動zookeeper# ./zkServer.sh start ../conf/zoo.cfgZooKeeper JMX enabled by defaultUsing config: ../conf/zoo.cfgStarting zookeeper ... STARTED在zookeeper 中查看同步canal 的信息:zkCli.sh -server localhost:2181ls /otterget /otter/canal/destinations/crm_canal/2/cursor2.安裝配置canal.deployer
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gzcanal,adapter 最好下載同版本。adapter1.1.5 插件需要更新為:client-adapter.es7x-1.1.5-jar-with-dependencies.jar 相關說明如下: canal 1.1.5本身有個Bug存在(https://github.com/alibaba/canal/issues/3636),需要手動替換一個插件mkdir /opt/app/canal.adapter /opt/app/canal.deployer tar zxvf /opt/download/canal.deployer-1.1.5.tar.gz -C /opt/app/canal.deployer tar zxvf /opt/download/canal.adapter-1.1.5.tar.gz -C /opt/app/canal.adapter2.1 canal deployer 配置文件主要參數(shù):
vi conf/canal.properties # canal.id = 1 #如果是集群,編號要不相同canal.ip = 192.168.0.60 #本地IPcanal.port = 11111 #端口canal.metrics.pull.port = 11112canal.zkServers = 192.168.0.60 #zookeeper 服務器,這里是本地canal.serverMode = kafka #同步方式,kafka 數(shù)據(jù)流的方式canal.destinations = shop #同步后的目的地名稱(在mq中可以查看到)kafka.bootstrap.servers = 192.168.0.60:90922.1 canal instance 配置文件主要參數(shù):
cd /opt/app/canal.deplyer/conf
cp example/instance.properties ./shop/
vi shop/instance.properties
mysql數(shù)據(jù)庫相關配置
canal.instance.master.address=mysql_server_ip:3306 canal.instance.dbUsername=canal canal.instance.dbPassword= [密碼查看其它文件] 待同步數(shù)據(jù)表 canal.instance.filter.regex=shop.tb_building,shop.tb_article,shop.tb_home_news,shop.tb_home_store_product,shop. tb_travel_product 指定topic canal.mq.topic=SYNC_ES_SHOP2.2 adapter 參數(shù)配置
vi conf/application.ymlcanal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:127.0.0.1:2181
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
3.2 開啟實時同步、全量同步數(shù)據(jù)到ES
1.啟動canal-server
2.啟動canal-adapter
3.建立ES 索引
cd /opt/json/es_index_json_file/curl -XPUT -H "Content-Type: application/json" http://127.0.0.1:9201/tb_home_store_product?include_type_name=true -d "@tb_home_store_product.json"curl -XPUT -H "Content-Type: application/json" http://127.0.0.1:9201/tb_travel_product?include_type_name=true -d "@tb_travel_product.json"#查看ES中的索引curl -XGET http://127.0.0.1:9201/_cat/indices?v#刪除索引#curl -XDELETE http://127.0.0.1:9201/tb_building4.全量同步數(shù)據(jù)到ES
curl -X POST http://127.0.0.1:8081/etl/es7/exampleKey/tb_home_store_product.yml curl -X POST http://127.0.0.1:8081/etl/es7/exampleKey/tb_travel_product.ymlES web 查詢 頁面:
http://[internet ip]:9800/
連接內部 ESDH IP: http://192.168.0.60:9201
#以下文件內容在進行同步時,拷貝另存為對應的文件即可
****************************************** tb_home_store_product.json file ***************************************************************
tb_home_store_product.json file
{"mappings":{"home_store_product_doc":{"properties":{"id": {"type": "integer"},"store_name": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"store_info": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"content": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"mark_price": {"type": "double"},"ot_price": {"type": "double"},"sales": {"type": "long"},"ficti": {"type": "long"},"create_by": {"type": "keyword"},"create_time": {"type": "date"},"update_by": {"type": "keyword"},"update_time": {"type": "date"} } } } }****************************************** tb_home_store_product.yml file ***************************************************************
dataSourceKey: defaultDS destination: SYNC_ES_SHOP_TEST outerAdapterKey: exampleKey groupId: g1 esMapping:_index: tb_home_store_product_id: idsql: "select id,store_name,store_info,content,mark_price,ot_price,sales,ficti,is_show,del_flag,img_size,image,create_by,create_time,update_by,update_time from tb_home_store_product t"etlCondition: "where t.id>={0} and t.id<={1}"commitBatch: 3000****************************************** tb_travel_product.json file ***************************************************************
{"mappings":{"travel_product_doc":{"properties":{"id": {"type": "integer"},"on_sale": {"type": "integer"},"product_name": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"product_shortname": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"product_price": {"type": "double"},"product_img": {"type": "keyword"},"sale_count": {"type": "long"},"create_by": {"type": "keyword"},"create_time": {"type": "date"},"update_by": {"type": "keyword"},"update_time": {"type": "date"} } } } }************************************************** tb_travel_product.yml file *******************************************************
###說明:這里可以看出,這個查詢是2個表join 后的結果,說明是可以2個表導出到ES 為一個索引的。
dataSourceKey: defaultDS destination: SYNC_ES_SHOP_TEST outerAdapterKey: exampleKey groupId: g1 esMapping:_index: tb_travel_product_id: idsql: "SELECT t.id as id,t.product_no, t.on_sale, t.product_name, t.product_shortname,t.product_price, i. product_img, t.sale_count, t.create_by, t.create_time, t.update_by, t.update_time from tb_travel_product t left join tb_travel_product_info i on i.product_no = t.product_no"etlCondition: "where t.id>={0} and t.id<={1}"commitBatch: 3000總結
以上是生活随笔為你收集整理的MySQL -> ES 数据同步 配置步骤的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: elasticsearch7.9.2 安
- 下一篇: 使用ADO操作数据库时一个好用的VARI