javascript
Spring Cloud —— RocketMQ 的消息类型
導航
- 引言
- 一、普通消息
- 1.1 可靠同步發送
- 1.2 可靠異步發送
- 1.3 單向發送
- 二、順序消息
- 三、事務消息
- 3.1 什么是事務消息
- 3.2 事務消息示例
- 1、編寫本地事務邏輯
- 2、發送半事務消息
- 3、注冊本地事務監聽器
- 4、測試
引言
本文承接《Spring Cloud —— 消息隊列與 RocketMQ》
RocketMQ 提供了多種場景所需的消息類型,包括普通消息、順序消息、事務消息,本文分別針對這些消息類型予以展開介紹。
一、普通消息
普通消息分為三種發送方式:可靠同步發送、可靠異步發送、單向發送。
簡言之,可靠同步發送就是消息發送方直到收到MQ的發送結果才發送下一條消息;可靠異步發送就是消息接收方暫時不關心發送結果,連續發送消息,采用消息發送回調的方式接收MQ的發送結果響應;單向發送就是不同步等待發送結果也不設置任何回調函數。
1.1 可靠同步發送
可靠同步發送,表示發送方會同步等待 MQ 的發送結果,可以使用 rocketMQTemplate.syncSend(…) 來實現。
syncSend 有很多重載方法,包括可以在參數列表中指定一個毫秒級的超時時間。
syncSend 如何設置標簽?
syncSend(“topic:tag”, 其他參數);
@Slf4j @SpringBootTest(classes = OrderApplication.class) @RunWith(SpringRunner.class) public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testSyncSend() {SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1:testTag", "這是一條同步消息");log.info("同步消息發送結果:{}", sendResult);} }1.2 可靠異步發送
可靠異步發送,表示不等待MQ返回響應,而通過回調接口接收服務器響應,并對發送結果進行處理。異步發送一般用于鏈路耗時較長,對RT 響應時間較為敏感的業務場景。
由于junit運行完會立即退出,因此需要 Thread.sleep 避免 JVM shutdown,實際開發不需要。
@Slf4j @SpringBootTest(classes = OrderApplication.class) @RunWith(SpringRunner.class) public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testAsyncSend() throws InterruptedException {rocketMQTemplate.asyncSend("test-topic-1", "這是一條異步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("發送結果:{}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.error("消息發送異常,{}", throwable);}});System.out.println("================");// 實際開發不需要Thread.sleep(10000);} }執行結果:
================ 2021-10-05 09:04:16.284 INFO [service-order,,,] 7608 --- [ublicExecutor_1] com.morty.rocketmq.MessageTypeTest : 發送結果:SendResult [sendStatus=SEND_OK, msgId=C0A803781DB858644D46168BB8FC0000, offsetMsgId=C0A8018C00002A9F000000000002FF47, messageQueue=MessageQueue [topic=test-topic-1, brokerName=DEFAULT_BROKER, queueId=3], queueOffset=1]1.3 單向發送
單向發送,表示發送方只負責發送消息,不等待服務器回應,且沒有回調函數觸發,即只發送請求不等待應答。
適用于某些耗時非常短,但對可靠性要求不高的場景,例如日志收集。
二、順序消息
順序消息是消息隊列提供的一種嚴格按照順序來發布和消費的消息類型。
Broker 中默認有4個 ConsumeQueue 用來作為消息的傳輸通道,如果不做特殊要求,消息會分散到不同的 Queue 中,導致消息的亂序。因此,如果希望消息嚴格保證順序發送和接收,就必須可以保證順序的消息發送 API ,使得這些 Message 可以發送到同一個 Queue 中。
對于可靠同步、可靠異步,以及單向發送的場景,都提供了 xxxSendOrderly(…) 方法,除了保證消息可以分配到同一個 queue 中,以保證消息的有序性之外,沒有任何其他區別。
sendOrderly(…) 方法除了需要基本的信息之外,還需要傳入一個唯一的 HashKey,只要能夠保證唯一即可。
如何驗證消息是否被分配到了同一個 queue ?在RocketMQ 控制臺的主題中找到如下按鈕:
如果消息能夠發送到同一個 queue,那么這幾個 queue 中只會有一個 queue 的最大位點發生變化,由此就可以推斷消息是否被分配到了同一個 queue 中:
三、事務消息
本節內容參考:消息類型-事務消息
3.1 什么是事務消息
RocketMQ 提供了事務消息,通過事務消息就能達到分布式事務的最終一致性。
上圖是 RocketMQ 提供的事務消息工作流程圖,這是一種非常典型的分布式事務的解決方案。
半事務消息(half message)
指暫不能投遞的消息,發送方已經成功地將消息發送到 RocketMQ 服務端,但是MQ未收到生產者對該消息的二次確認,此時該消息被標記為“暫不能投遞”狀態,處于該種狀態下的消息即半事務消息。
消息回查(check back)
MQ服務端針對半事務消息主動向生產者查詢其事務狀態。由于網絡閃斷、生產者重啟等原因,導致某些事務消息的二次確認丟失,MQ服務端通過掃描發現某些消息長期處于“半事務消息”狀態,需要主動向消息生產者詢問該消息的最終狀態(commit 或 rollback),該詢問過程即為消息回查。
3.2 事務消息示例
完成一個訂單創建的事務消息案例。本地事務采用本地事務表的方式記錄事務的狀態。
本地事務表
或本地消息表,是一種記錄本地事務狀態的獨立表結構,專門用于存儲事務信息,簡化并統一本地事務的回查邏輯。表中的每條記錄都代表一個已經成功執行的事務。一般會將本地事務表的入庫操作和某個業務放在同一個事務中,這樣就可以保證事務信息存在,那么事務一定成功。
事務消息的編碼步驟要緊扣 RocketMQ 事務消息的流程。
1、編寫本地事務邏輯
為下單邏輯增加事務屬性,并在其中加入事務消息記錄的邏輯。使用 shop_tx_log 來完成本地事務記錄的工作,在執行下單后,同一事務中,完成事務入庫的操作。
@Data @Entity(name = "shop_tx_log") public class TxLog {@Idprivate String txId;private Date date; } public interface TxLogDao extends JpaRepository<TxLog, String> { } @Transactional public void createOrder(String txId, Order order) {// 保存訂單orderDao.save(order);TxLog txLog = new TxLog();txLog.setTxId(txId);txLog.setDate(new Date());// 記錄事務日志txLogDao.save(txLog); }2、發送半事務消息
在 OrderService 下新增半事務消息發送接口:
/*** 下單半事務消息*/ public void createOrderHalfMsg(Order order) {String txId = UUID.randomUUID().toString();rocketMQTemplate.sendMessageInTransaction("tx_producer_group","tx_topic",MessageBuilder.withPayload(order).setHeader("txId", txId).build(),order); }sendMessageInTransaction(…) 方法傳入四個參數
第3個參數:org.springframework.messaging.support.MessageBuilder 用于構建 Message 對象,withPayload() 傳入一個核心的消息實體對象,setHeader() 可以為 Message 對象設置消息頭,這里把 txId 放入消息頭中以備后面的消息回查。
第4個參數:Object 對象,用于后續執行本地事務時需要使用的數據
這一步驟是 RocketMQ 事務消息的第一步——發送半事務消息,也是代表開啟一個以RocketMQ 為基礎的分布式事務,除了設置一些基本的消息內容之外(分組、主題等),還需要通過構建MessageBuilder來構建Message,并綁定一個該分布式事務的 transaction Id,和執行后面執行本地事務的必要參數。
3、注冊本地事務監聽器
RocketMQLocalTransactionListener 提供了事務消息流程中“執行本地事務”和“消息回查”兩個步驟的監聽入口。
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message;/*** 本地事務監聽器** @data 2021/10/5 15:03*/ @Service @RocketMQTransactionListener(txProducerGroup = "tx_producer_group") public class OrderMQListener implements RocketMQLocalTransactionListener {@Autowiredprivate OrderService orderService;@Autowiredprivate TxLogDao txLogDao;/*** 執行本地事務*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {String txId = (String) message.getHeaders().get("txId");// 執行本地事務orderService.createOrder(txId, (Order) o);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}/*** 事務回查*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String txId = (String) message.getHeaders().get("txId");TxLog txLog = txLogDao.findById(txId).get();if (txLog != null) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK;}} }這里有個小問題,在發送半事務消息的時候,已經有發送結果,那為什么不直接在收到發送成功的響應后直接執行本地事務呢?還要再創建 executeLocalTransaction 這樣的回調方法才去執行本地事務?
我認為是因為由MQ主動調用回調函數來執行本地事務具有更強的可靠性。如果直接以發送半消息的結果作為依據來執行本地事務,一旦由于網絡或發送端重啟等原因未收到半消息的發送結果,就會導致本地事務無法觸發,系統的容錯性偏低。而提供了回調接口,就可以由MQ來觸發本地事務的執行,MQ也可以更好的將本地事務的執行和MQ半事務消息的提交綁定到同一個事務中,更利于事務的管控。
4、測試
上面三步已經基本把事務消息的代碼編寫完畢,只要在 Controller 層調用 半消息發送方法就可以完成整個事務消息功能。
這里需要對 OrderMQListener 的兩個回調函數 executeLocalTransaction() 和 checkLocalTransaction() 打上斷點,并檢查執行 executeLocalTransaction 時是否完成 txLog 對象的入庫。
從測試結果來看,并沒有什么問題。
如何測試消息回查呢?我們可以直接在 executeLocalTransaction() 返回前殺死 order-service ,這樣MQ Server 就收不到二次確認的信息,從而會觸發消息回查方法。
可以使用 kill 命令,這里簡單介紹下 Windows 下是如何操作的。
D:\idea-workspace\shop>jps 11792 Jps 18372 RemoteMavenServer 23284 OrderApplication 9780 rocketmq-console-ng-1.0.0.jar 13080 Launcher 15672 nacos-server.jar 20840 ProductApplication 8200 D:\idea-workspace\shop>taskkill -F /pid 23284 成功: 已終止 PID 為 23284 的進程。OrderApplication 已經停止,再次啟動后,不多一會就可以收到 MQ 的消息回查請求觸發 checkLocalTransaction() 方法。測試成功!
總結
以上是生活随笔為你收集整理的Spring Cloud —— RocketMQ 的消息类型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: html页面加载时执行ajax请求,页面
- 下一篇: java偶数和_Java编程计算1-10