日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud Stream如何处理消息重复消费

發布時間:2024/7/5 javascript 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud Stream如何处理消息重复消费 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近收到好幾個類似的問題:使用Spring Cloud Stream操作RabbitMQ或Kafka的時候,出現消息重復消費的問題。通過溝通與排查下來主要還是用戶對消費組的認識不夠。其實,在之前的博文以及《Spring Cloud微服務實戰》一書中都有提到關于消費組的概念以及作用。

那么什么是消費組呢?為什么要用消費組?它解決什么問題呢?摘錄一段之前博文的內容,來解答這些疑問:

通常在生產環境,我們的每個服務都不會以單節點的方式運行在生產環境,當同一個服務啟動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。默認情況下,當生產者發出一條消息到綁定通道上,這條消息會產生多個副本被每個消費者實例接收和處理(出現上述重復消費問題)。但是有些業務場景之下,我們希望生產者產生的消息只被其中一個實例消費,這個時候我們需要為這些消費者設置消費組來實現這樣的功能。

詳細也可查看原文:消息驅動的微服務(消費組)。

下面,通過一個例子來看看如何使用消費組:

問題重現

構建消息消費端

第一步:創建綁定接口,綁定example-topic輸入通道(默認情況下,會綁定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。

interface ExampleBinder {

String NAME = "example-topic";

@Input(NAME)
SubscribableChannel input();

}

第二步:對上述輸入通道創建監聽與處理邏輯。

@EnableBinding(ExampleBinder.class)
public class ExampleReceiver {

private static Logger logger = LoggerFactory.getLogger(ExampleReceiver.class);

@StreamListener(ExampleBinder.NAME)
public void receive(String payload) {
logger.info("Received: " + payload);
}

}

第三步;創建應用主類和配置文件

@SpringBootApplication
public class ExampleApplication {

public static void main(String[] args) {
SpringApplication.run(ExampleApplication.class, args);
}

}
spring.application.name=stream-consumer-group
server.port=0

這里設置server.port=0,以方便在本地啟動多實例來重現問題。

完成上述操作之后,啟動兩個該應用的實例,以備后續調用。

構建消息生產端

比較簡單,需要注意的是,使用@Output創建一個同名的輸出綁定,這樣發出的消息才能被上述啟動的實例接收到。具體實現如下:

@RunWith(SpringRunner.class)
@EnableBinding(value = {ExampleApplicationTests.ExampleBinder.class})
public class ExampleApplicationTests {

@Autowired
private ExampleBinder exampleBinder;

@Test
public void exampleBinderTester() {
exampleBinder.output().send(MessageBuilder.withPayload("Produce a message from : http://blog.didispace.com").build());
}

public interface ExampleBinder {

String NAME = "example-topic";

@Output(NAME)
MessageChannel output();

}

}

啟動上述測試用例之后,可以發現之前啟動的兩個實例都收到的消息,并在日志中打印了:Received: Produce a message from : http://blog.didispace.com。消息重復消費的問題成功重現!

使用消費組解決問題

如何解決上述消息重復消費的問題呢?我們只需要在配置文件中增加如下配置即可:

spring.cloud.stream.bindings.example-topic.group=aaa

當我們指定了某個綁定所指向的消費組之后,往當前主題發送的消息在每個訂閱消費組中,只會有一個訂閱者接收和消費,從而實現了對消息的負載均衡。只所以之前會出現重復消費的問題,是由于默認情況下,任何訂閱都會產生一個匿名消費組,所以每個訂閱實例都會有自己的消費組,從而當有消息發送的時候,就形成了廣播的模式。

另外,需要注意上述配置中example-topic是在代碼中@Output和@Input中傳入的名字。

代碼示例

本文示例讀者可以通過查看下面倉庫的中的stream-consumer-group項目:

  • Github
  • Gitee

如果您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!

以下專題教程也許您會有興趣

  • Spring Boot基礎教程
  • Spring Cloud基礎教程

總結

以上是生活随笔為你收集整理的Spring Cloud Stream如何处理消息重复消费的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 丝袜老师扒开让我了一夜漫画 | 十大黄台在线观看 | 天堂在线资源网 | 午夜不卡av免费 | 久久资源在线 | 国产亚洲一区在线 | 国产一级在线免费观看 | 毛片动态图 | 在线亚洲成人 | 国内性视频 | 国产极品久久 | 免费观看成年人视频 | 日韩网站免费观看 | 乱色专区 | 无码人妻精品一区二区三 | 亚洲永久精品视频 | 日韩少妇一区二区三区 | av噜噜在线观看 | 午夜伦伦电影理论片费看 | 波多野结衣av中文字幕 | 欧美日韩国产不卡 | freesex性hd公交车上 | 熟女俱乐部五十路六十路av | 国产高清视频在线观看 | 亚洲欧美韩国 | 国产精品午夜未成人免费观看 | 欧美伦理一区二区 | 欧美日韩国产麻豆 | 耳光调教vk | 德国老妇性猛交 | 亚洲欧美系列 | 成人黄色大片在线观看 | 韩国一区视频 | 精品av一区二区 | 天天综合一区 | jizz性欧美2 视频在线日韩 | 人人射视频| 国内爆初菊对白视频 | 男女视频网站 | 亚洲码中文 | 日韩黄色在线视频 | 一区二区蜜桃 | 一级黄色片免费看 | 激情a | xxxxhdvideos| 久久wwww| 欧美xxx视频 | 亚洲综合另类小说 | 国产日韩中文字幕 | 人妻洗澡被强公日日澡 | 日韩欧美亚洲一区二区 | 中文字幕一区二区人妻视频 | 亚洲高清在线观看 | 美女扒开尿口让男人桶 | 久久久精品一区二区三区 | 亚洲一二三四五 | 免费看黄色a级片 | 国产精品久久久久久久久借妻 | 欧美日韩一本 | 中文有码在线播放 | 黄一区二区三区 | 久久国产在线观看 | 国产欧美日韩专区发布 | 一级片免费 | 天堂中文在线官网 | 成人免费观看av | 日韩二区 | k8yy毛片| xiuxiuavnet| 岛国一区二区 | 无码h黄肉3d动漫在线观看 | 人人爽人人爽人人 | 糖心av | 成人激情免费视频 | 欧美美女在线观看 | 国产肉丝在线 | 黑丝一区| 精品国自产在线观看 | 大肉大捧一进一出好爽mba | 成人国产精品一区 | 少妇欧美激情一区二区三区 | 欧美特级黄 | 97看片吧| 韩国av一区二区三区 | 亚洲精品日日夜夜 | 欧美视频性 | 亚洲人人插| 国产看真人毛片爱做a片 | 吃奶在线观看 | 在线不卡欧美 | 色香影院| 亚洲激情婷婷 | 特种兵之深入敌后高清全集免费观看 | 天天摸夜夜| 搡老熟女老女人一区二区 | 黄色香蕉网 | 精品国产乱码久久久久久蜜臀 | 国产超碰人人爽人人做人人爱 | 秋霞国产午夜精品免费视频 |