日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程

發(fā)布時間:2025/3/20 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

點擊上方“Java知音”,選擇“置頂公眾號”

技術文章第一時間送達!

上一期講到了通過canal訂閱mysql的binlog日志并且轉(zhuǎn)換為對象,那么這一次我們將訂閱來的對象通過RocketMQ發(fā)送消息,接收方接受消息之后同時存儲到其他類型的數(shù)據(jù)源當中,完成一個簡單的數(shù)據(jù)異構的過程。

什么是Java消息服務?

兩個應用程序之間進行異步通信的API,它為標準消息協(xié)議和消息服務提供了一組通用接口,包括創(chuàng)建、發(fā)送、讀取消息等,用于支持JAVA應用程序開發(fā)。

在J2EE中,當兩個應用程序使用JMS進行通信時,它們之間并不是直接相連的,而是通過一個共同的消息收發(fā)服務連接起來,可以達到解耦的效果,我們將會在接下來的教程中詳細介紹。

jms的消息傳送模型

常見的消息傳送模型有以下兩種:

點對點消息傳送模型

在點對點消息傳送模型中,應用程序由消息隊列,發(fā)送者,接收者組成。每一個消息發(fā)送給一個特殊的消息隊列,該隊列保存了所有發(fā)送給它的消息(除了被接收者消費掉的和過期的消息)。如下圖所示:

發(fā)布訂閱消息傳送模型

在發(fā)布訂閱模型中,消費者需要訂閱相關的topic才能接收到生產(chǎn)者的信息。生產(chǎn)者會將信息傳輸?shù)絫opic中,然后消費者只需要從topic中獲取數(shù)據(jù)即可。如下圖所示:

RocketMQ消息隊列使用

這次使用的消息中間件為RocketMQ的使用場景。RocketMQ是阿里巴巴在2012年開源的分布式消息中間件,目前已經(jīng)捐贈給Apache基金會,并于2016年11月成為 Apache 孵化項目。

RocketMQ在使用之前,需要我們引入相關的依賴配置:

????
????????<dependency>
????????????<groupId>org.apache.rocketmqgroupId>
????????????<artifactId>rocketmq-clientartifactId>
????????????<version>${rocketmq.version}version>
????????dependency>

關于RocketMQ的安裝在這里就不做過多的講解了。

通過mq的方式來進行數(shù)據(jù)異構通常是比較簡單的方案,首先我們需要在項目里面獨立一個模塊專門用于監(jiān)聽mysql的binlog日志,這個模塊我暫且稱之為datahandle-core模塊

整個工程采用了springboot的結構來構建,主要的核心也是在core工程中。

首先是監(jiān)聽canal的日志狀態(tài)模塊了,采用了上一節(jié)中講解到的客戶端代碼進行數(shù)據(jù)監(jiān)聽,并且將其轉(zhuǎn)換為對象然后發(fā)送往mq中:

package?com.sise.datahandle.core;

import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.protocol.Message;
import?lombok.extern.slf4j.Slf4j;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.CommandLineRunner;
import?org.springframework.stereotype.Component;

import?java.net.InetSocketAddress;

import?static?com.sise.datahandle.constants.CanalConstants.*;

/**
?*?@author?idea
?*?@date?2019/10/20
?*/
@Component
@Slf4j
public?class?CanalListener?implements?CommandLineRunner?{

????@Autowired
????private?CanalClient?canalClient;

????@Override
????public?void?run(String...?args)?throws?Exception?{
??????log.info("=============canal監(jiān)聽器開啟===============");
????????CanalConnector?canalConnector?=?CanalConnectors.newSingleConnector(
????????????????new?InetSocketAddress(SERVER_ADDRESS,?PORT),?DESTINATION,?USERNAME,?PASSWORD);
????????canalConnector.connect();
????????canalConnector.subscribe(".*\\..*");
????????canalConnector.rollback();
????????for?(;?;?)?{
????????????Message?message?=?canalConnector.getWithoutAck(100);
????????????long?batchId?=?message.getId();
????????????if?(batchId?!=?-1)?{
????????????????canalClient.entityHandle(message.getEntries());
????????????}
????????}
????}
}

ps:這里面的CanalClient代碼主要來自上一篇的canal客戶端代碼,文末會有完整項目代碼鏈接,需要的讀者可以前往查看。

在CanalClient里面,有一個函數(shù)是專門用于處理將訂閱的數(shù)據(jù)發(fā)送到mq消息隊列中:

package?com.sise.datahandle.core;

import?com.alibaba.fastjson.JSON;
import?com.alibaba.otter.canal.protocol.CanalEntry;
import?com.google.protobuf.InvalidProtocolBufferException;
import?com.sise.datahandle.handler.CanalDataHandler;
import?com.sise.datahandle.model.TypeDTO;
import?lombok.extern.slf4j.Slf4j;
import?org.apache.rocketmq.client.exception.MQBrokerException;
import?org.apache.rocketmq.client.exception.MQClientException;
import?org.apache.rocketmq.client.producer.DefaultMQProducer;
import?org.apache.rocketmq.client.producer.SendResult;
import?org.apache.rocketmq.remoting.exception.RemotingException;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Service;

import?java.util.List;

/**
?*?canal監(jiān)聽客戶端變化
?*
?*?@author?idea
?*?@date?2019/10/12
?*/
@Slf4j
@Service
public?class?CanalClient?{


????@Autowired
????private?DefaultMQProducer?rocketMqProducer;


????/**
?????*?處理binlog日志的監(jiān)聽
?????*
?????*?@param?entries
?????*/
????public?void?entityHandle(List?entries)?{
????????for?(CanalEntry.Entry?entry?:?entries)?{
????????????if?(entry.getEntryType()?!=?CanalEntry.EntryType.ROWDATA)?{
????????????????continue;
????????????}
????????????try?{
????????????????CanalEntry.RowChange?rowChange?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());
????????????????for?(CanalEntry.RowData?rowData?:?rowChange.getRowDatasList())?{
????????????????????switch?(rowChange.getEventType())?{
????????????????????????case?INSERT:
????????????????????????????String?tableName?=?entry.getHeader().getTableName();
????????????????????????????//測試選用t_type這張表進行映射處理
????????????????????????????if?("t_type".equals(tableName))?{
????????????????????????????????TypeDTO?typeDTO?=?CanalDataHandler.convertToBean(rowData.getAfterColumnsList(),?TypeDTO.class);
????????????????????????????????org.apache.rocketmq.common.message.Message?message?=?new?org.apache.rocketmq.common.message.Message();
????????????????????????????????message.setTopic("canal-test-topic");
????????????????????????????????message.setTags("canal-test-tag");
????????????????????????????????String?json?=?JSON.toJSONString(typeDTO);
????????????????????????????????message.setBody(json.getBytes());
????????????????????????????????SendResult?sendResult?=?rocketMqProducer.send(message);
????????????????????????????????log.info("[mq消息發(fā)送結果]----"?+?sendResult);
????????????????????????????}
????????????????????????????break;
????????????????????????default:
????????????????????????????break;
????????????????????}
????????????????}
????????????}?catch?(InvalidProtocolBufferException?e)?{
????????????????log.error("[CanalClient]監(jiān)聽數(shù)據(jù)過程出現(xiàn)異常,異常信息為{}",?e);
????????????}?catch?(InterruptedException?|?RemotingException?|?MQClientException?|?MQBrokerException?e)?{
????????????????log.error("[CanalClient]?mq發(fā)送信息出現(xiàn)異常:{}",?e);
????????????}
????????}
????}

}

這里面主要是監(jiān)聽binlog記錄為插入數(shù)據(jù)事件的時候做發(fā)送mq操作。

接下來便是常見的mq配置了,本工程主要是一個模擬的簡單案例,因此我將consumer和producer都放在了一起方便測試。

通過springboot自身的properties文件對mq進行參數(shù)初始化配置之后便可以構建一個基本的consumer和producer了。這里我們拿一個TypeDto類來進行樹異構的測試,consumer端的核心代碼為:

package?com.sise.datahandle.mq.rocketmq.consumer;

import?com.sise.datahandle.model.TypeDTO;
import?com.sise.datahandle.mq.rocketmq.producer.RocketMqMsgHandle;
import?com.sise.datahandle.redis.RedisService;
import?lombok.extern.slf4j.Slf4j;
import?org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import?org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import?org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import?org.apache.rocketmq.common.message.MessageExt;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Component;
import?org.springframework.util.CollectionUtils;

import?java.util.List;

/**
?*?@author?idea
?*?@date?2019/10/20
?*/
@Component
@Slf4j
public?class?RocketMqConsumeMsgListenerProcessor?implements?MessageListenerConcurrently?{

????@Autowired
????private?RedisService?redisService;

????@Override
????public?ConsumeConcurrentlyStatus?consumeMessage(List?msgs,?ConsumeConcurrentlyContext?context)?{if(CollectionUtils.isEmpty(msgs)){
????????????log.info("接受到的消息為空,不處理,直接返回成功");return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}
????????MessageExt?messageExt?=?msgs.get(0);
????????System.out.println("接受到的消息為:"+messageExt.toString());if("canal-test-topic".equals(messageExt.getTopic())){if("canal-test-tag".equals(messageExt.getTags())){
????????????????int?reconsume?=?messageExt.getReconsumeTimes();if(reconsume?==3){//消息已經(jīng)重試了3次,如果不需要再次消費,則返回成功return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????????????}
????????????????TypeDTO?typeDTO?=?RocketMqMsgHandle.parseMessage(messageExt,TypeDTO.class);//存儲進入redis中
????????????????redisService.setObject("typeDTO-"+System.currentTimeMillis(),typeDTO);
????????????}
????????}//?如果沒有return?success?,consumer會重新消費該消息,直到return?successreturn?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????}
}

通過訂閱mq的信息,讀取相關的數(shù)據(jù)再次寫入到redis里面,完成一個簡單過程的數(shù)據(jù)異構。

整個迷你工程寫下來,比較核心的地方就在于對binlog日志的解析器部分,如何將日志訂閱之后轉(zhuǎn)換為相應的對象進行處理。

通常采用mq的方式進行數(shù)據(jù)異構會相對簡單,實際上是在監(jiān)聽binlog為寫DB的同時去寫一次MQ,但是這種方式不能夠保證數(shù)據(jù)一致性,就是不能保證跨資源的事務。注:調(diào)用第三方遠程RPC的操作一定不要放到事務中。

完整案例的代碼鏈接如下(點擊閱讀原文直達):

https://gitee.com/IdeaHome_admin/wfw

推薦閱讀(點擊即可跳轉(zhuǎn)閱讀)

1.SpringBoot內(nèi)容聚合

2.面試題內(nèi)容聚合

3.設計模式內(nèi)容聚合

4.Mybatis內(nèi)容聚合

5.多線程內(nèi)容聚合

覺得不錯?歡迎轉(zhuǎn)發(fā)分享給更多人

我知道你 “在看

總結

以上是生活随笔為你收集整理的canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 蜜臀在线一区二区三区 | 一区精品在线观看 | 久草视频在线免费播放 | 天天操天天射天天 | www在线播放 | 国产一级特黄 | 91精品国产成人www | 97人人澡人人爽人人模亚洲 | 久久99精品久久久水蜜桃 | 亚洲精品视频大全 | 精品一区二区三区在线免费观看 | 色欲狠狠躁天天躁无码中文字幕 | 国内精品视频在线观看 | 精品少妇人妻一区二区黑料社区 | 男人的天堂网在线 | 中文字幕在线观看国产 | 91视频看片 | 揄拍成人国产精品视频 | 大尺度叫床戏做爰视频 | 国产伦精品免费视频 | 五月天色站 | 国产一区视频在线播放 | 在线视频这里只有精品 | av网站大全免费 | 天天草影院 | 国产精品一线天粉嫩av | 亚洲综合五月天婷婷丁香 | 黄色福利网 | 久久不射电影网 | 久久久久久国产精品视频 | 男生尿隔着内裤呲出来视频 | av自拍网 | 中日韩午夜理伦电影免费 | 玉势 (1v1 高h) | 加勒比色综合 | 亚洲乱码一区二区三区 | 久久成人免费视频 | 日韩av免费 | 一级空姐毛片 | 日韩精品一区二区三区中文字幕 | 欧美激情区 | 亚洲aaaaaaa | 综合一区在线 | 日皮视频在线观看 | 成人av激情 | 中文字幕一二三 | 久久人人爽人人爽人人片亚洲 | 国产成人无码一二三区视频 | 国产区在线视频 | 久久密桃 | 免费av在线 | 国产精品69毛片高清亚洲 | 91国产在线看| 欧美精品首页 | 双乳被四个男人吃奶h文 | 免费a v在线 | 优优色综合| 亚洲精品视频在线播放 | 国产做爰xxxⅹ高潮视频12p | 日本人jizz | 热re99久久精品国产99热 | 另类小说五月天 | 精品视频日韩 | 国产中文字幕乱人伦在线观看 | 欧美人禽杂交狂配 | 琪琪免费视频 | 巨大胸大乳奶电影 | 天天做天天操 | 日本不卡一区 | 国产在线综合网 | 欧美在线 | 亚洲 | 欧美成人午夜精品免费 | 久久久av网站 | 93久久精品日日躁夜夜躁欧美 | 精品国产免费人成在线观看 | 国产黄在线免费观看 | 四级黄色片| 人人97| 欧美久久久久久又粗又大 | 中文字幕日韩精品在线观看 | 99久久久久久 | 亚洲国产大片 | 日本簧片在线观看 | 国内精品国产三级国产aⅴ久 | 中文字幕永久视频 | 91手机在线 | 人妻体内射精一区二区 | 好色艳妇小说 | 日韩在线精品 | 成人毛片a | 国产伦精品一区二区三区88av | 天天干天天色 | www.中文字幕在线观看 | 久久黄色小说 | 国产羞羞 | 亚洲av无码一区二区三区在线 | 美女视频黄色免费 | 中文字幕第27页 | 国产一区二区在线视频观看 |