日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ原理解析-producer 4.发送分布式事物消息

發(fā)布時間:2025/3/21 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ原理解析-producer 4.发送分布式事物消息 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

2019獨角獸企業(yè)重金招聘Python工程師標準>>>

RocketMQ原理解析-producer 4.發(fā)送分布式事物消息 博客分類: MQ

為什么消息要具備事務能力


還是比較清晰的。簡單的說 就是在你業(yè)務邏輯過程中,需要發(fā)送一條消息給訂閱消息的人,但是期望是 此邏輯過程完全成功完成之后才能使訂閱者收到消息。
業(yè)務邏輯過程 假設是這樣的:
邏輯部分a-->發(fā)消息給MQ-->邏輯部分b
假設我們在發(fā)送消息給MQ之后執(zhí)行邏輯部分b時產生了異常,那如果MQ不具備事務消息能力時,訂閱者也收到了消息。這是我們不希望見到的。

分布式事務基礎概念

  • 關于分布式事務、兩階段提交協(xié)議、三階提交協(xié)議
  • 理解分布式事務的兩階段提交2pc
  • 分布式事務(一)兩階段提交及JTA
  • 分布式系統(tǒng)常用思想和技術總結
  • 【整理】腦裂問題
  • 分布式系統(tǒng)的事務處理
  • 多版本并發(fā)控制(MVCC)在分布式系統(tǒng)中的應用
  • 戲說PAXOS
  • 阿里云消息隊列 MQ關于事務消息的文檔
  • rocketmq具備事務能力的demo

    參見TransactionProducerDemo.java

    向producer注冊的TransactionCheckListener實現(xiàn)并沒有用,因為返回LocalTransactionState.UNKNOW狀態(tài)時,在3.2.6版本中,是不支持此狀態(tài)下回調TransactionCheckListener的,具體參見以下兩個issue。

    事務消息 LocalTransactionState.UNKNOW 狀態(tài)下不回查 #221
    開源版本支持事務消息嗎 #364
    測試過程中發(fā)現(xiàn)返回UNKNOW狀態(tài)是不能正確達到期望的,但是返回ROLLBACK_MESSAGE狀態(tài)還是能達到期望的。

    實現(xiàn)分析入口

    這個實現(xiàn)的入口還是比較容易找的,只要搜尋ROLLBACK_MESSAGE這個變量的引用即可發(fā)現(xiàn)。順著搜索查看,其實很容易發(fā)現(xiàn),客戶端在收到業(yè)務邏輯返回的事務狀態(tài)時會發(fā)送一條結束事務的指令給broker。

    // com.alibaba.rocketmq.client.impl.MQClientAPIImpl.endTransactionOneway(String, EndTransactionRequestHeader, String, long) 871行 RemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);

    按broker對外部指令的常規(guī)做法,一般會有一個Processor與之對應。是EndTransactionProcessor,看BrokerController374行其注冊的地方,沒錯。

    EndTransactionProcessor分析(broker側)

    如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.ROLLBACK_MESSAGE時,EndTransactionProcessor會清空message的body的置成null,queueOffset也不會更新,那么consumer就收不到消息了。

    //--EndTransactionProcessor.processRequest 200行-- if (MessageSysFlag.TransactionRollbackType == requestHeader.getCommitOrRollback()) {msgInner.setBody(null); }

    如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.COMMIT_MESSAGE,那么EndTransactionProcessor則會照常put message。

    事務消息分為兩個階段,prepare階段與commit階段。prepare階段的消息會寫入store,只是寫完之后的queueOffset(邏輯位置)為0(commit階段寫完消息后的queueOffset就不是0了。);

    ?

    ?

    // -- com.alibaba.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, Object) 1002行 -- final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queue case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType:queueOffset = 0L;break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: default:break;

    待分析問題列表:
    1. prepare階段已經將消息發(fā)了過去,commit的時候是否還會再發(fā)送一次消息?
    2. rollback的時候是否會將prepare的消息刪除?

    ?

    http://www.cnblogs.com/simoncook/p/6478196.html

    ?

    分布式事物是基于二階段提交的

    1)??????一階段,向broker發(fā)送一條prepared的消息,返回消息的offset即消息地址commitLog中消息偏移量。Prepared狀態(tài)消息不被消費

    發(fā)送消息ok,執(zhí)行本地事物分支, 本地事物方法需要實現(xiàn)rocketmq的回調接口2)2)2) LocalTransactionExecuter,處理本地事物邏輯返回處理的事物狀態(tài)LocalTransactionState

    3)? 二階段,處理完本地事物中業(yè)務得到事物狀態(tài), 根據offset查找到commitLog中的prepared消息,設置消息狀態(tài)commitType或者rollbackType, 讓后將信息添加到commitLog中, 其實二階段生成了兩條消息

    ?

    ?

    ?

    事物消息發(fā)送



    http://blog.csdn.net/quhongwei_zhanqiu/article/details/39142389/

    ?

    轉載于:https://my.oschina.net/xiaominmin/blog/1597808

    總結

    以上是生活随笔為你收集整理的RocketMQ原理解析-producer 4.发送分布式事物消息的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。