日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rockemq 发送延迟消息_RocketMQ系列(五)广播与延迟消息

發布時間:2024/9/18 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rockemq 发送延迟消息_RocketMQ系列(五)广播与延迟消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

今天要給大家介紹RocketMQ中的兩個功能,一個是“廣播”,這個功能是比較基礎的,幾乎所有的mq產品都是支持這個功能的;另外一個是“延遲消費”,這個應該算是RocketMQ的特色功能之一了吧。接下來,我們就分別看一下這兩個功能。

廣播

廣播是把消息發送給訂閱了這個主題的所有消費者。這個定義很清楚,但是這里邊的知識點你都掌握了嗎?咱們接著說“廣播”的機會,把消費者這端的內容好好和大家說說。

首先,消費者端的概念中,最大的應該是消費者組,一個消費者組中可以有多個消費者,這些消費者必須訂閱同一個Topic。

那么什么算是一個消費者呢?我們在寫消費端程序時,看到了setConsumeThreadMax這個方法,設置消費者的線程數,難道一個線程就是一個消費者?錯!這里的一個消費者是一個進程,你可以理解為ip+端口。如果在同一個應用中,你實例化了兩個消費者,這兩個消費者配置了相同的消費者組名稱,那么應用程序啟動時會報錯的,這里不給大家演示了,感興趣的小伙伴私下里試一下吧。

同一個消息,可以被不同的消費者組同時消費。假設,我有兩個消費者組cg-1和cg-2,這兩個消費者組訂閱了同一個Topic,那么這個Topic的消息會被cg-1和cg-2同時消費。那這是不是廣播呢?錯!當然不是廣播,廣播是同一個消費者組中的多個消費者都消費這個消息。如果配置的不是廣播,像前幾個章節中的那樣,一個消息只能被一個消費者組消費一次。

好了,說了這么多,我們實驗一下吧,先把消費者配置成廣播,如下:

@Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown")

public DefaultMQPushConsumer broadcast() throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast");

consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");

consumer.subscribe("cluster-topic","*");

consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

for (MessageExt msg : msgs) {

System.out.println(new String(msg.getBody()));

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

return consumer;

}

其中,NameServer,訂閱的Topic都沒有變化。

注意其中consumer.setMessageModel(MessageModel.BROADCASTING);這段代碼,設置消費者為廣播。咱們可以看一下,MessageModel枚舉中只有兩個值,BROADCASTING和CLUSTERING,默認為CLUSTERING。

因為要測試廣播,所以我們要啟動多個消費者,還記得什么是消費者嗎?對了,一個ip+端口算是一個消費者,在這里我們啟動兩個應用,端口分別是8080和8081。發送端的程序不變,如下:

@Test

public void producerTest() throws Exception {

for (int i = 0;i<5;i++) {

MessageExt message = new MessageExt();

message.setTopic("cluster-topic");

message.setKeys("key-"+i);

message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes());

SendResult sendResult = defaultMQProducer.send(message);

System.out.println("i=" + i);

System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());

}

}

我們執行一下發送端的程序,日志如下:

i=0

BrokerName:broker-a

i=1

BrokerName:broker-a

i=2

BrokerName:broker-b

i=3

BrokerName:broker-b

i=4

BrokerName:broker-b

再來看看8080端口的應用后臺打印出來的日志:

消費了5個消息,再看看8081的后臺打印的日志,

也消費了5個。兩個消費者同時消費了消息,這就是廣播。有的小伙伴可能會有疑問了,如果不設置廣播,會怎么樣呢?私下里實驗一下吧,上面的程序中,只要把設置廣播的那段代碼注釋掉就可以了。運行的結果當然是只有一個消費者可以消費消息。

延遲消息

延遲消息是指消費者過了一個指定的時間后,才去消費這個消息。大家想象一個電商中場景,一個訂單超過30分鐘未支付,將自動取消。這個功能怎么實現呢?一般情況下,都是寫一個定時任務,一分鐘掃描一下超過30分鐘未支付的訂單,如果有則被取消。這種方式由于每分鐘查詢一下訂單,一是時間不精確,二是查庫效率比較低。這個場景使用RocketMQ的延遲消息最合適不過了,我們看看怎么發送延遲消息吧,發送端代碼如下:

@Test

public void producerTest() throws Exception {

for (int i = 0;i<1;i++) {

MessageExt message = new MessageExt();

message.setTopic("cluster-topic");

message.setKeys("key-"+i);

message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes());

message.setDelayTimeLevel(2);

SendResult sendResult = defaultMQProducer.send(message);

System.out.println("i=" + i);

System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());

}

}

我們只是增加了一句message.setDelayTimeLevel(2);

為了方便,這次我們只發送一個消息。

setDelayTimeLevel是什么意思,設置的是2,難道是2s后消費嗎?怎么參數也沒有時間單位呢?如果我要自定義延遲時間怎么辦?我相信很多小伙伴都有這樣的疑問,我也是帶著這樣的疑問查了很多資料,最后在RocketMQ的Github官網上看到了說明,

在RocketMQ的源碼中,有一個MessageStoreConfig類,這個類中定義了延遲的時間,我們看一下,

// org/apache/rocketmq/store/config/MessageStoreConfig.java

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

我們在程序中設置的是2,那么這個消息將在5s以后被消費。

目前RocketMQ還不支持自定義延遲時間,延遲時間只能從上面的時間中選。如果你非要定義一個時間怎么辦呢?RocketMQ是開源的,下載代碼,把上面的時間改一下,再打包部署,就OK了。

再看看消費端的代碼,

@Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown")

public DefaultMQPushConsumer broadcast() throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast");

consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");

consumer.subscribe("cluster-topic","*");

consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

for (MessageExt msg : msgs) {

Date now = new Date();

System.out.println("消費時間:"+now);

Date msgTime = new Date();

msgTime.setTime(msg.getBornTimestamp());

System.out.println("消息生成時間:"+msgTime);

System.out.println(new String(msg.getBody()));

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

return consumer;

}

我們還是使用廣播的模式,沒有變。

打印出了當前的時間,這個時間就是消費的時間。

通過msg.getBornTimestamp()方法,獲得了消息的生成時間,也打印出來,看看是不是延遲5s。

啟動兩個消費者8080和8081,發送消息,再看看消費者的后臺日志,

消費時間:Thu Jun 11 14:45:53 CST 2020

消息生成時間:Thu Jun 11 14:45:48 CST 2020

this is simpleMQ,my NO is 0---Thu Jun 11 14:45:47 CST 2020

我們看到消費時間比生成時間晚5s,符合我們的預期。這個功能還是比較實用的,如果能夠自定義延遲時間就更好了。

總結

RocketMQ的這兩個知識點還是比較簡單的,大家要分清楚什么是消費者組,什么是消費者,什么是消費者線程。另外就是延遲消息是不支持自定義的,大家可以在Github上看一下源碼。好了~今天就到這里了。

總結

以上是生活随笔為你收集整理的rockemq 发送延迟消息_RocketMQ系列(五)广播与延迟消息的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 神马午夜在线观看 | 久久久久久久久久av | 色偷偷噜噜噜亚洲男人的天堂 | 国产精品理伦片 | av美女在线 | av导航大全 | 色播五月激情 | 三级男人添奶爽爽爽视频 | 91丨porny丨 | 国产jjizz一区二区三区视频 | 嫩草影院永久入口 | 澳门黄色录像 | 成人性生交大片免费看 | 国产精品夜夜 | 黄色激情在线 | 日韩精品中文字幕一区二区 | 中文字幕88 | 91网站大全| 男人操女人的软件 | 国产精品无码久久久久一区二区 | 黄色福利社 | 国产污污网站 | 国产不卡视频 | 欧美午夜精品久久久久久人妖 | 亚洲va欧美va国产综合久久 | 欧美精品欧美极品欧美激情 | 免费看黄色的网址 | 自由 日本语 热 亚洲人 | 亚洲午夜一区 | 国产精品二区一区二区aⅴ 一卡二卡三卡在线观看 | 天天宗合 | 先锋影视av| 色玖玖| 国产免费一区二区三区视频 | 在线中文天堂 | 久久久久久久久久久久91 | 精品国产一区二区三区性色av | 91福利在线视频 | 久久久久国产精品 | 国产精品日本一区二区在线播放 | 黄页网站在线播放 | 激情五月婷婷网 | 日本污视频在线观看 | 日本中文字幕久久 | 日本涩涩网站 | 中文字幕.com | 天天操夜夜爱 | 国产精品久久久无码一区 | 黄色av电影网址 | 欧美一区二区在线免费观看 | 毛片毛片毛片毛片毛片毛片毛片毛片毛片毛片 | 天天艹夜夜 | 99精品毛片| xxxx日本少妇| 综合 欧美 亚洲日本 | 日日草日日干 | 日日摸日日碰夜夜爽无码 | 制服丝袜天堂网 | 极品探花在线观看 | 尹人综合 | 午夜精品久久久久久久99黑人 | 熟女人妇 成熟妇女系列视频 | 国产91在线高潮白浆在线观看 | av午夜天堂 | 岛国av不卡 | 懂色aⅴ国产一区二区三区 亚洲欧美国产另类 | 亚洲精品ww| 欧美性猛交ⅹxxx乱大交3 | 欧美色啪| 中文字幕中出 | 国产日韩视频在线观看 | 一级片a级片 | 影音先锋亚洲资源 | 久久伊人五月天 | 国模二区 | 亚洲一区中文 | 少妇激情视频 | 亚洲一区二区网站 | 后宫秀女调教(高h,np) | 欧美a视频 | 韩国av永久免费 | 中国黄色网页 | 玩弄丰满少妇xxxxx性多毛 | 国产3级| 色欲AV无码精品一区二区久久 | 欧美视频免费在线观看 | 尤物视频在线观看视频 | 福利电影一区二区 | 狠狠操av| 中国女人av | 波多野结衣精品 | 色女人天堂 | 中文在线√天堂 | 亲子乱子伦xxxx | 久热最新 | 中文在线中文资源 | 欧美国产高潮xxxx1819 | 国产网站久久 | 超碰98在线观看 |