RocketMQ原理解析-producer 4.发送分布式事物消息
2019獨角獸企業(yè)重金招聘Python工程師標準>>>
RocketMQ原理解析-producer 4.發(fā)送分布式事物消息 博客分類: MQ為什么消息要具備事務能力
還是比較清晰的。簡單的說 就是在你業(yè)務邏輯過程中,需要發(fā)送一條消息給訂閱消息的人,但是期望是 此邏輯過程完全成功完成之后才能使訂閱者收到消息。
業(yè)務邏輯過程 假設是這樣的:
邏輯部分a-->發(fā)消息給MQ-->邏輯部分b
假設我們在發(fā)送消息給MQ之后執(zhí)行邏輯部分b時產生了異常,那如果MQ不具備事務消息能力時,訂閱者也收到了消息。這是我們不希望見到的。
分布式事務基礎概念
rocketmq具備事務能力的demo
參見TransactionProducerDemo.java
向producer注冊的TransactionCheckListener實現(xiàn)并沒有用,因為返回LocalTransactionState.UNKNOW狀態(tài)時,在3.2.6版本中,是不支持此狀態(tài)下回調TransactionCheckListener的,具體參見以下兩個issue。
事務消息 LocalTransactionState.UNKNOW 狀態(tài)下不回查 #221
開源版本支持事務消息嗎 #364
測試過程中發(fā)現(xiàn)返回UNKNOW狀態(tài)是不能正確達到期望的,但是返回ROLLBACK_MESSAGE狀態(tài)還是能達到期望的。
實現(xiàn)分析入口
這個實現(xiàn)的入口還是比較容易找的,只要搜尋ROLLBACK_MESSAGE這個變量的引用即可發(fā)現(xiàn)。順著搜索查看,其實很容易發(fā)現(xiàn),客戶端在收到業(yè)務邏輯返回的事務狀態(tài)時會發(fā)送一條結束事務的指令給broker。
// com.alibaba.rocketmq.client.impl.MQClientAPIImpl.endTransactionOneway(String, EndTransactionRequestHeader, String, long) 871行 RemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);按broker對外部指令的常規(guī)做法,一般會有一個Processor與之對應。是EndTransactionProcessor,看BrokerController374行其注冊的地方,沒錯。
EndTransactionProcessor分析(broker側)
如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.ROLLBACK_MESSAGE時,EndTransactionProcessor會清空message的body的置成null,queueOffset也不會更新,那么consumer就收不到消息了。
//--EndTransactionProcessor.processRequest 200行-- if (MessageSysFlag.TransactionRollbackType == requestHeader.getCommitOrRollback()) {msgInner.setBody(null); }如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.COMMIT_MESSAGE,那么EndTransactionProcessor則會照常put message。
事務消息分為兩個階段,prepare階段與commit階段。prepare階段的消息會寫入store,只是寫完之后的queueOffset(邏輯位置)為0(commit階段寫完消息后的queueOffset就不是0了。);
?
?
// -- com.alibaba.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, Object) 1002行 -- final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queue case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType:queueOffset = 0L;break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: default:break;待分析問題列表:
1. prepare階段已經將消息發(fā)了過去,commit的時候是否還會再發(fā)送一次消息?
2. rollback的時候是否會將prepare的消息刪除?
?
http://www.cnblogs.com/simoncook/p/6478196.html
?
分布式事物是基于二階段提交的
1)??????一階段,向broker發(fā)送一條prepared的消息,返回消息的offset即消息地址commitLog中消息偏移量。Prepared狀態(tài)消息不被消費
發(fā)送消息ok,執(zhí)行本地事物分支, 本地事物方法需要實現(xiàn)rocketmq的回調接口2)2)2) LocalTransactionExecuter,處理本地事物邏輯返回處理的事物狀態(tài)LocalTransactionState
3)? 二階段,處理完本地事物中業(yè)務得到事物狀態(tài), 根據offset查找到commitLog中的prepared消息,設置消息狀態(tài)commitType或者rollbackType, 讓后將信息添加到commitLog中, 其實二階段生成了兩條消息
?
?
?
事物消息發(fā)送
http://blog.csdn.net/quhongwei_zhanqiu/article/details/39142389/
?
轉載于:https://my.oschina.net/xiaominmin/blog/1597808
總結
以上是生活随笔為你收集整理的RocketMQ原理解析-producer 4.发送分布式事物消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑不能安装虚拟机--解决办法
- 下一篇: css案例学习之relative与abs