日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

必知必会的RocketMQ消息类型

發(fā)布時間:2024/9/27 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 必知必会的RocketMQ消息类型 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

普通消息

普通消息也叫做無序消息,簡單來說就是沒有順序的消息,producer 只管發(fā)送消息,consumer 只管接收消息,至于消息和消息之間的順序并沒有保證,可能先發(fā)送的消息先消費,也可能先發(fā)送的消息后消費。

舉個簡單例子,producer 依次發(fā)送 order id 為 1、2、3 的消息到 broker,consumer 接到的消息順序有可能是 1、2、3,也有可能是 2、1、3 等情況,這就是普通消息。

因為不需要保證消息的順序,所以消息可以大規(guī)模并發(fā)地發(fā)送和消費,吞吐量很高,適合大部分場景。

代碼示例

  • 生產(chǎn)者
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//聲明并初始化一個producer//需要一個producer group名字作為構(gòu)造方法的參數(shù),這里為concurrent_producerDefaultMQProducer producer = new DefaultMQProducer("concurrent_producer");//設(shè)置NameServer地址,此處應(yīng)改為實際NameServer地址,多個地址之間用;分隔//NameServer的地址必須有,但是也可以通過環(huán)境變量的方式設(shè)置,不一定非得寫死在代碼里producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//調(diào)用start()方法啟動一個producer實例producer.start();//發(fā)送10條消息到Topic為TopicTest,tag為TagA,消息內(nèi)容為“Hello RocketMQ”拼接上i的值for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTestConcurrent",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body);//調(diào)用producer的send()方法發(fā)送消息//這里調(diào)用的是同步的方式,所以會有返回結(jié)果,同時默認(rèn)發(fā)送的也是普通消息SendResult sendResult = producer.send(msg);//打印返回結(jié)果,可以看到消息發(fā)送的狀態(tài)以及一些相關(guān)信息System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//發(fā)送完消息之后,調(diào)用shutdown()方法關(guān)閉producerproducer.shutdown();} }
  • 消費者
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//聲明并初始化一個consumer//需要一個consumer group名字作為構(gòu)造方法的參數(shù),這里為concurrent_consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrent_consumer");//同樣也要設(shè)置NameServer地址consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//這里設(shè)置的是一個consumer的消費策略//CONSUME_FROM_LAST_OFFSET 默認(rèn)策略,從該隊列最尾開始消費,即跳過歷史消息//CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認(rèn)是半個小時以前consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//設(shè)置consumer所訂閱的Topic和Tag,*代表全部的Tagconsumer.subscribe("TopicTestConcurrent", "*");//設(shè)置一個Listener,主要進(jìn)行消息的邏輯處理//注意這里使用的是MessageListenerConcurrently這個接口consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);//返回消費狀態(tài)//CONSUME_SUCCESS 消費成功//RECONSUME_LATER 消費失敗,需要稍后重新消費return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//調(diào)用start()方法啟動consumerconsumer.start();System.out.println("Consumer Started.");} }

有序消息

有序消息就是按照一定的先后順序的消息類型。

舉個例子來說,producer 依次發(fā)送 order id 為 1、2、3 的消息到 broker,consumer 接到的消息順序也就是 1、2、3 ,而不會出現(xiàn)普通消息那樣的 2、1、3 等情況。

那么有序消息是如何保證的呢?我們都知道消息首先由 producer 到 broker,再從 broker 到 consumer,分這兩步走。那么要保證消息的有序,勢必這兩步都是要保證有序的,即要保證消息是按有序發(fā)送到 broker,broker 也是有序?qū)⑾⑼哆f給 consumer,兩個條件必須同時滿足,缺一不可。
進(jìn)一步還可以將有序消息分成

  • 全局有序消息
  • 局部有序消息

之前我們講過,topic 只是消息的邏輯分類,內(nèi)部實現(xiàn)其實是由 queue 組成。當(dāng) producer 把消息發(fā)送到某個 topic 時,默認(rèn)是會消息發(fā)送到具體的 queue 上。


全局有序

舉個例子,producer 發(fā)送 order id 為 1、2、3、4 的四條消息到 topicA 上,假設(shè) topicA 的 queue 數(shù)為 3 個(queue0、queue1、queue2),那么消息的分布可能就是這種情況,id 為 1 的在 queue0,id 為 2 的在 queue1,id 為 3 的在 queue2,id 為 4 的在 queue0。同樣的,consumer 消費時也是按 queue 去消費,這時候就可能出現(xiàn)先消費 1、4,再消費 2、3,和我們的預(yù)期不符。那么我們?nèi)绾螌崿F(xiàn) 1、2、3、4 的消費順序呢?道理其實很簡單,只需要把訂單 topic 的 queue 數(shù)改為 1,如此一來,只要 producer 按照 1、2、3、4 的順序去發(fā)送消息,那么 consumer 自然也就按照 1、2、3、4 的順序去消費,這就是全局有序消息。

由于一個 topic 只有一個 queue ,即使我們有多個 producer 實例和 consumer 實例也很難提高消息吞吐量。就好比過獨木橋,大家只能一個挨著一個過去,效率低下。

那么有沒有吞吐量和有序之間折中的方案呢?其實是有的,就是局部有序消息。


局部有序

我們知道訂單消息可以再細(xì)分為訂單創(chuàng)建、訂單付款、訂單完成等消息,這些消息都有相同的 order id。同時,也只有按照訂單創(chuàng)建、訂單付款、訂單完成的順序去消費才符合業(yè)務(wù)邏輯。但是不同 order id 的消息是可以并行的,不會影響到業(yè)務(wù)。這時候就常見做法就是將 order id 進(jìn)行處理,將 order id 相同的消息發(fā)送到 topicB 的同一個 queue,假設(shè)我們 topicB 有 2 個 queue,那么我們可以簡單的對 id 取余,奇數(shù)的發(fā)往 queue0,偶數(shù)的發(fā)往 queue1,消費者按照 queue 去消費時,就能保證 queue0 里面的消息有序消費,queue1 里面的消息有序消費。

由于一個 topic 可以有多個 queue,所以在性能比全局有序高得多。假設(shè) queue 數(shù)是 n,理論上性能就是全局有序的 n 倍,當(dāng)然 consumer 也要跟著增加才行。在實際情況中,這種局部有序消息是會比全局有序消息用的更多。

示例代碼

  • 生產(chǎn)者
public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {// 聲明并初始化一個producer// 需要一個producer group名字作為構(gòu)造方法的參數(shù),這里為ordered_producerDefaultMQProducer orderedProducer = new DefaultMQProducer("ordered_producer");// 設(shè)置NameServer地址,此處應(yīng)改為實際NameServer地址,多個地址之間用;分隔//NameServer的地址必須有,但是也可以通過環(huán)境變量的方式設(shè)置,不一定非得寫死在代碼里orderedProducer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");// 調(diào)用start()方法啟動一個producer實例orderedProducer.start();// 自定義一個tag數(shù)組String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};// 發(fā)送10條消息到Topic為TopicTestOrdered,tag為tags數(shù)組按順序取值,// key值為“KEY”拼接上i的值,消息內(nèi)容為“Hello RocketMQ”拼接上i的值for (int i = 0; i < 10; i++) {int orderId = i % 10;Message msg =new Message("TopicTestOrdered", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = orderedProducer.send(msg, new MessageQueueSelector() {// 選擇發(fā)送消息的隊列@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// arg的值其實就是orderIdInteger id = (Integer) arg;// mqs是隊列集合,也就是topic所對應(yīng)的所有隊列int index = id % mqs.size();// 這里根據(jù)前面的id對隊列集合大小求余來返回所對應(yīng)的隊列return mqs.get(index);}}, orderId);System.out.println(sendResult);}orderedProducer.shutdown();} catch (MQClientException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} }

至于是要實現(xiàn)全局有序,還是局部有序,在此示例代碼中,就取決于 TopicTestOrdered 這個 Topic 的隊列數(shù)了。

  • 消費者
public class Consumer {public static void main(String[] args) throws MQClientException {//聲明并初始化一個consumer//需要一個consumer group名字作為構(gòu)造方法的參數(shù),這里為concurrent_consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer");//同樣也要設(shè)置NameServer地址consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//這里設(shè)置的是一個consumer的消費策略//CONSUME_FROM_LAST_OFFSET 默認(rèn)策略,從該隊列最尾開始消費,即跳過歷史消息//CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認(rèn)是半個小時以前consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//設(shè)置consumer所訂閱的Topic和Tagconsumer.subscribe("TopicTestOrdered", "TagA || TagC || TagD");//設(shè)置一個Listener,主要進(jìn)行消息的邏輯處理//注意這里使用的是MessageListenerOrderly這個接口consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);//返回消費狀態(tài)//SUCCESS 消費成功//SUSPEND_CURRENT_QUEUE_A_MOMENT 消費失敗,暫停當(dāng)前隊列的消費return ConsumeOrderlyStatus.SUCCESS;}});//調(diào)用start()方法啟動consumerconsumer.start();System.out.println("Consumer Started.");} }

延時消息

延時消息,簡單來說就是當(dāng) producer 將消息發(fā)送到 broker 后,會延時一定時間后才投遞給 consumer 進(jìn)行消費。

RcoketMQ的延時等級為:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延時。level=1,表示 1 級延時,對應(yīng)延時 1s。level=2 表示 2 級延時,對應(yīng)5s,以此類推。

這種消息一般適用于消息生產(chǎn)和消費之間有時間窗口要求的場景。比如說我們網(wǎng)購時,下單之后是有一個支付時間,超過這個時間未支付,系統(tǒng)就應(yīng)該自動關(guān)閉該筆訂單。那么在訂單創(chuàng)建的時候就會就需要發(fā)送一條延時消息(延時15分鐘)后投遞給 consumer,consumer 接收消息后再對訂單的支付狀態(tài)進(jìn)行判斷是否關(guān)閉訂單。

設(shè)置延時非常簡單,只需要在Message設(shè)置對應(yīng)的延時級別即可:

Message msg = new Message("TopicTest",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body);// 這里設(shè)置需要延時的等級即可msg.setDelayTimeLevel(3);SendResult sendResult = producer.send(msg);

更多技術(shù)干貨,可以掃描下面的二維碼,關(guān)注微信公眾號:馮先生的筆記



作者:馮先生的筆記
鏈接:http://www.jianshu.com/p/11e875074a8f
來源:簡書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。

總結(jié)

以上是生活随笔為你收集整理的必知必会的RocketMQ消息类型的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。