日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rocketmq 消费方式_RocketMQ事务消费和顺序消费详解

發布時間:2023/12/10 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rocketmq 消费方式_RocketMQ事务消费和顺序消费详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、RocketMq有3中消息類型

1.普通消費

2. 順序消費

3.事務消費

順序消費場景

在網購的時候,我們需要下單,那么下單需要假如有三個順序,第一、創建訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單才有意義。RocketMQ可以保證順序消費。

rocketMq實現順序消費的原理

produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者注冊消息監聽器為MessageListenerOrderly,這樣就可以保證消費端只有一個線程去消費消息

注意:是把把消息發到同一個隊列(queue),不是同一個topic,默認情況下一個topic包括4個queue

單個節點(Producer端1個、Consumer端1個)

1、Producer.java

packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,發送順序消息*/

public classProducer {public static voidmain(String[] args) {try{

DefaultMQProducer producer= new DefaultMQProducer("order_Producer");

producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };

for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},0);

System.out.println(sendResult);

}

producer.shutdown();

}catch(MQClientException e) {

e.printStackTrace();

}catch(RemotingException e) {

e.printStackTrace();

}catch(MQBrokerException e) {

e.printStackTrace();

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

2、Consumer.java

packageorder;importjava.util.List;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicLong;importcom.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;importcom.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.common.consumer.ConsumeFromWhere;importcom.alibaba.rocketmq.common.message.MessageExt;/*** 順序消息消費,帶事務方式(應用可控制Offset什么時候提交)*/

public classConsumer1 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費

* 如果非第一次啟動,那么按照上次消費的位置繼續消費*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");

consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//設置自動提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",內容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer1 Started.");

}

}

結果如下圖所示:

這個五條數據被順序消費了

多個節點(Producer端1個、Consumer端2個)

Producer.java

packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,發送順序消息*/

public classProducer {public static voidmain(String[] args) {try{

DefaultMQProducer producer= new DefaultMQProducer("order_Producer");

producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };

for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},0);

System.out.println(sendResult);

}for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},1);

System.out.println(sendResult);

}for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},2);

System.out.println(sendResult);

}

producer.shutdown();

}catch(MQClientException e) {

e.printStackTrace();

}catch(RemotingException e) {

e.printStackTrace();

}catch(MQBrokerException e) {

e.printStackTrace();

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

Consumer1.java

/*** 順序消息消費,帶事務方式(應用可控制Offset什么時候提交)*/

public classConsumer1 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費

* 如果非第一次啟動,那么按照上次消費的位置繼續消費*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");/*** 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到

*,第二個線程無法訪問這個隊列*/consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//設置自動提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",內容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer1 Started.");

}

}

Consumer2.java

/*** 順序消息消費,帶事務方式(應用可控制Offset什么時候提交)*/

public classConsumer2 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費

* 如果非第一次啟動,那么按照上次消費的位置繼續消費*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");/*** 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到

*,第二個線程無法訪問這個隊列*/consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//設置自動提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",內容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer2 Started.");

}

}

先啟動Consumer1和Consumer2,然后啟動Producer,Producer會發送15條消息

Consumer1消費情況如圖,都按照順序執行了

Consumer2消費情況如圖,都按照順序執行了

二、事務消費

這里說的主要是分布式事物。下面的例子的數據庫分別安裝在不同的節點上。

事物消費需要先說說什么是事務。比如說:我們跨行轉賬,從工商銀行轉到建設銀行,也就是我從工商銀行扣除1000元之后,我的建設銀行也必須加1000元。這樣才能保證數據的一致性。假如工商銀行轉1000元之后,建設銀行的服務器突然宕機,那么我扣除了1000,但是并沒有在建設銀行給我加1000,就出現了數據的不一致。因此加1000和減1000才行,減1000和減1000必須一起成功,一起失敗。

再比如,我們進行網購的時候,我們下單之后,訂單提交成功,倉庫商品的數量必須減一。但是訂單可能是一個數據庫,倉庫數量可能又是在另個數據庫里面。有可能訂單提交成功之后,倉庫數量服務器突然宕機。這樣也出現了數據不一致的問題。

使用消息隊列來解決分布式事物:

現在我們去外面飯店吃飯,很多時候都不會直接給了錢之后直接在付款的窗口遞飯菜,而是付款之后他會給你一張小票,你拿著這個小票去出飯的窗口取飯。這里和我們的系統類似,提高了吞吐量。即使你到第二個窗口,師傅告訴你已經沒飯了,你可以拿著這個憑證去退款,即使中途由于出了意外你無法到達窗口進行取飯,但是只要憑證還在,可以將錢退給你。這樣就保證了數據的一致性。

如何保證憑證(消息)有2種方法:

1、在工商銀行扣款的時候,余額表扣除1000,同時記錄日志,而且這2個表是在同一個數據庫實例中,可以使用本地事物解決。然后我們通知建設銀行需要加1000給該用戶,建設銀行收到之后給我返回已經加了1000給用戶的確認信息之后,我再標記日志表里面的日志為已經完成。

2、通過消息中間件

總結

以上是生活随笔為你收集整理的rocketmq 消费方式_RocketMQ事务消费和顺序消费详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。