日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

使用rabbitMQ实现数据同步

發(fā)布時(shí)間:2024/4/13 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用rabbitMQ实现数据同步 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

思路分析

發(fā)送方:商品微服務(wù)

  • 什么時(shí)候發(fā)?

    當(dāng)商品服務(wù)對(duì)商品進(jìn)行寫(xiě)操作:增、刪、改的時(shí)候,需要發(fā)送一條消息,通知其它服務(wù)。

  • 發(fā)送什么內(nèi)容?

    對(duì)商品的增刪改時(shí)其它服務(wù)可能需要新的商品數(shù)據(jù),但是如果消息內(nèi)容中包含全部商品信息,數(shù)據(jù)量太大,而且并不是每個(gè)服務(wù)都需要全部的信息。因此我們只發(fā)送商品id,其它服務(wù)可以根據(jù)id查詢(xún)自己需要的信息。

接收方:搜索微服務(wù)、靜態(tài)頁(yè)微服務(wù)

接收消息后如何處理?

  • 搜索微服務(wù):

    • 增/改:添加新的數(shù)據(jù)到索引庫(kù)

    • 刪:刪除索引庫(kù)數(shù)據(jù)

  • 靜態(tài)頁(yè)微服務(wù):

    • 增/改:創(chuàng)建新的靜態(tài)頁(yè)

    • 刪:刪除原來(lái)的靜態(tài)頁(yè)

商品服務(wù)發(fā)送消息

我們先在商品微服務(wù)learn-item-service中實(shí)現(xiàn)發(fā)送消息。

引入依賴(lài)

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

配置文件

我們?cè)赼pplication.yml中添加一些有關(guān)RabbitMQ的配置:

spring:rabbitmq:host: 192.168.56.101username: learnpassword: learnvirtual-host: /learntemplate:exchange: learn.item.exchangepublisher-confirms: true
  • template:有關(guān)AmqpTemplate的配置

    • exchange:缺省的交換機(jī)名稱(chēng),此處配置后,發(fā)送消息如果不指定交換機(jī)就會(huì)使用這個(gè)

  • publisher-confirms:生產(chǎn)者確認(rèn)機(jī)制,確保消息會(huì)正確發(fā)送,如果發(fā)送失敗會(huì)有錯(cuò)誤回執(zhí),從而觸發(fā)重試

改造GoodsService

在GoodsService中封裝一個(gè)發(fā)送消息到mq的方法:(需要注入AmqpTemplate模板)

private void sendMessage(Long id, String type){// 發(fā)送消息try {this.amqpTemplate.convertAndSend("item." + type, id);} catch (Exception e) {logger.error("{}商品消息發(fā)送異常,商品id:{}", type, id, e);} }

這里沒(méi)有指定交換機(jī),因此默認(rèn)發(fā)送到了配置中的:leyou.item.exchange

注意:這里要把所有異常都try起來(lái),不能讓消息的發(fā)送影響到正常的業(yè)務(wù)邏輯

?

然后在新增的時(shí)候調(diào)用:

修改的時(shí)候調(diào)用:

搜索服務(wù)接收消息

搜索服務(wù)接收到消息后要做的事情:

  • 增:添加新的數(shù)據(jù)到索引庫(kù)

  • 刪:刪除索引庫(kù)數(shù)據(jù)

  • 改:修改索引庫(kù)數(shù)據(jù)

因?yàn)樗饕龓?kù)的新增和修改方法是合二為一的,因此我們可以將這兩類(lèi)消息一同處理,刪除另外處理。

引入依賴(lài)

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

添加配置

spring:rabbitmq:host: 192.168.56.101username: learnpassword: learnvirtual-host: /learn

這里只是接收消息而不發(fā)送,所以不用配置template相關(guān)內(nèi)容。

編寫(xiě)監(jiān)聽(tīng)器

代碼:

@Component public class GoodsListener {@Autowiredprivate SearchService searchService;/*** 處理insert和update的消息** @param id* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.create.index.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.insert", "item.update"}))public void listenCreate(Long id) throws Exception {if (id == null) {return;}// 創(chuàng)建或更新索引this.searchService.createIndex(id);}/*** 處理delete的消息** @param id*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.delete.index.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = "item.delete"))public void listenDelete(Long id) {if (id == null) {return;}// 刪除索引this.searchService.deleteIndex(id);} }

編寫(xiě)創(chuàng)建和刪除索引方法

這里因?yàn)橐獎(jiǎng)?chuàng)建和刪除索引,我們需要在SearchService中拓展兩個(gè)方法,創(chuàng)建和刪除索引:

public void createIndex(Long id) throws IOException {Spu spu = this.goodsClient.querySpuById(id);// 構(gòu)建商品Goods goods = this.buildGoods(spu);// 保存數(shù)據(jù)到索引庫(kù)this.goodsRepository.save(goods); }public void deleteIndex(Long id) {this.goodsRepository.deleteById(id); }

創(chuàng)建索引的方法可以從之前導(dǎo)入數(shù)據(jù)的測(cè)試類(lèi)中拷貝和改造。

?

靜態(tài)頁(yè)服務(wù)接收消息

商品靜態(tài)頁(yè)服務(wù)接收到消息后的處理:

  • 增:創(chuàng)建新的靜態(tài)頁(yè)

  • 刪:刪除原來(lái)的靜態(tài)頁(yè)

  • 改:創(chuàng)建新的靜態(tài)頁(yè)并覆蓋原來(lái)的

不過(guò),我們編寫(xiě)的創(chuàng)建靜態(tài)頁(yè)的方法也具備覆蓋以前頁(yè)面的功能,因此:增和改的消息可以放在一個(gè)方法中處理,刪除消息放在另一個(gè)方法處理。

引入依賴(lài)

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

添加配置

spring:rabbitmq:host: 192.168.56.101username: learnpassword: learnvirtual-host: /learn

這里只是接收消息而不發(fā)送,所以不用配置template相關(guān)內(nèi)容。

?

編寫(xiě)監(jiān)聽(tīng)器

代碼:

@Component public class GoodsListener {@Autowiredprivate GoodsHtmlService goodsHtmlService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.create.web.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.insert", "item.update"}))public void listenCreate(Long id) throws Exception {if (id == null) {return;}// 創(chuàng)建頁(yè)面goodsHtmlService.createHtml(id);}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.delete.web.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = "item.delete"))public void listenDelete(Long id) {if (id == null) {return;}// 刪除頁(yè)面goodsHtmlService.deleteHtml(id);} }

添加刪除頁(yè)面方法

public void deleteHtml(Long id) {File file = new File("C:\\project\\nginx-1.14.0\\html\\item\\", id + ".html");file.deleteOnExit(); }

測(cè)試

3.5.1.查看RabbitMQ控制臺(tái)

重新啟動(dòng)項(xiàng)目,并且登錄RabbitMQ管理界面:http://192.168.56.101:15672

可以看到,交換機(jī)已經(jīng)創(chuàng)建出來(lái)了:

隊(duì)列也已經(jīng)創(chuàng)建完畢:

并且隊(duì)列都已經(jīng)綁定到交換機(jī):

總結(jié)

以上是生活随笔為你收集整理的使用rabbitMQ实现数据同步的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。