日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

通过源码告诉你,阿里的RocketMQ事务消息到底牛逼在哪?

發布時間:2025/3/21 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 通过源码告诉你,阿里的RocketMQ事务消息到底牛逼在哪? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章轉載自公眾號??心源意碼?,?作者 尋箏

得益于MQ削峰填谷,系統解耦,操作異步等功能特性,在互聯網行業,可以說有分布式服務的地方,MQ都往往不會缺席。

由阿里自研的RocketMQ更是經歷了多年的雙十一高并發挑戰,其中4.3.0版本推出了事務消息的新特性本文對RocketMQ 4.5.0版本事務消息相關的源碼跟蹤介紹,通過閱讀可以知道:

  • 事務消息解決什么樣的問題

  • 事務消息的實現原理及其設計亮點

01 解決什么問題

假設我所在的系統現在有這樣一個場景:

本地開啟數據庫事務進行扣款操作,成功后發送MQ消息給庫存中心進行發貨。

有人會想到開啟mybatis事務實現,把本地事務和MQ消息放在一起不就行了嗎?如果MQ發送成功,就提交事務,發送失敗就回滾事務,整套操作一氣呵成。

transaction{扣款();boolean success = 發送MQ();if(success){commit();}else{rollBack();} }

看似沒什么問題,但是網絡是不可靠的。

假設MQ返回過來的響應因為網絡原因遲遲沒有收到,所以在面對不確定的MQ返回結果只好進行回滾。但是MQ 服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。

既然MQ消息的發送不能和本地事務寫在一起,那如何來保證其整體具有原子性的需求呢?答案就是今天我們介紹的主角:事務消息

02 概覽

總體而言RocketMQ事務消息分為兩條主線

  • 定時任務發送流程:發送half message(半消息),執行本地事務,發送事務執行結果

  • 定時任務回查流程:MQ服務器回查本地事務,發送事務執行結果

  • 因此本文也通過這兩條主線對源碼進行分析

    03 源碼分析

    半消息發送流程

    本地應用(client)

    在本地應用發送事務消息的核心類是TransactionMQProducer,該類通過繼承DefaultMQProducer來復用大部分發送消息相關的邏輯,這個類的代碼量非常少只有100來行,下面是這個類的sendMessageTransaction方法

    @Override public TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException {if (null == this.transactionListener) {throw new MQClientException("TransactionListener is null", null);}return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); }

    這個方法做了兩件事,

  • 檢查transactionListener是否存在

  • 調用父類執行事務消息發送

  • TransactionListener在事務消息流程中起到至關重要的作用,一起看看這個接口

    public interface TransactionListener {/*** When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.** @param msg Half(prepare) message* @param arg Custom business parameter* @return Transaction state*/LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);/*** When no response to prepare(half) message. broker will send check message to check the transaction status, and this* method will be invoked to get local transaction status.** @param msg Check message* @return Transaction state*/LocalTransactionState checkLocalTransaction(final MessageExt msg); }

    接口注釋說的很明白,配合上面的概覽圖來看就是,executeLocalTransaction方法對應的就是執行本地事務操作,checkLocalTransaction對應的就是回查本地事務操作。

    下面是DefaultMQProducer類的

    sendMessageInTransaction方法源碼

    public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {...SendResult sendResult = null;MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());...sendResult = this.send(msg);...switch (sendResult.getSendStatus()) {case SEND_OK: {...localTransactionState = transactionListener.executeLocalTransaction(msg, arg);...break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}...this.endTransaction(sendResult, localTransactionState, localException);... }

    為了使源碼的邏輯更加直觀,筆者精簡了核心代碼。sendMessageInTransaction方法主要做了以下事情

  • 給消息打上事務消息相關的標記,用于MQ服務端區分普通消息和事務消息

  • 發送半消息(half message)

  • 發送成功則由transactionListener執行本地事務

  • 執行endTransaction方法,如果半消息發送失敗本地事務執行失敗告訴服務端是刪除半消息,半消息發送成功本地事務執行成功則告訴服務端生效半消息。

  • 發送半消息流程,Client端代碼到這里差不多就結束了,接下來看看RocketMQ Server端是如何處理的

    RocketMQ Server

    Server在接收到消息過后會進行一些領域對象的轉化和是否支持事務消息的權限校驗,對理解事務消息用處不大,此處就省略對旁枝末節的介紹了。下面是TransactionalMessageBridge類處理half message的源碼

    public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {return store.putMessage(parseHalfMessageInner(messageInner)); }private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner; }

    這兩個方法主要做了以下事情:

    public?class?Message?implements?Serializable?{private String topic;private int flag;private Map<String, String> properties;private byte[] body;private String transactionId; }
  • 將消息的topic,queueId放進消息體自身的map里進行緩存

  • 將消息的topic設置為“RMQ_SYS_TRANS_OP_HALF_TOPIC”,將queueId設置為0

  • 將消息寫入磁盤持久化

  • 可以看到所有的事務半消息都會被放進同一個topic的同一個queue里面,通過對topic的區分,從而避免了半消息被consumer給消費到

    Server將半消息持久化后然后會發送結果給我們本地的應用程序。到了這里Server端對半消息的處理就結束了,緊接著的是定時任務的登場。

    定時任務回查流程

    RocketMQ Server

    定時任務是一個叫TransactionalMessageService類的線程,下面是該類的check方法

    @Override public void check(long transactionTimeout, int transactionCheckMax,AbstractTransactionalMessageCheckListener listener) {...if?(!putBackHalfMsgQueue(msgExt,?i))?{continue;}listener.resolveHalfMsg(msgExt);} ... }

    check方法非常長,省略的代碼大致都是對半消息進行過濾操作(如超過72小時的事務消息,就被算作過期),只保留符合條件的半消息對其進行回查。

    其中很有意思的是putBackHalfMsgQueue方法,因為每次把半消息從磁盤拉到內存里進行處理都會對其屬性進行改變(例如TRANSACTION_CHECK_TIMES,這是是否丟棄事務消息的關鍵信息)。

    所以在發送回查消息之前需要對半消息再次放進磁盤。

    RocketMQ采取的方法是基于最新的物理偏移量重新寫入,而不是對原有的半消息進行修改,其中的目的就是RocketMQ的存儲設計采用順序寫,如果去修改消息 ,無法做到高性能。

    下面是resolveHalfMsg方法,主要就是開啟一個線程然后發送check消息。

    public void resolveHalfMsg(final MessageExt msgExt) {executorService.execute(new Runnable() {@Overridepublic void run() {try {sendCheckMessage(msgExt);} catch (Exception e) {LOGGER.error("Send check message error!", e);}}}); }

    本地應用(client)

    下面是DefaultMQProducerImpl的checkTransactionState方法,是本地應用對回查消息的處理邏輯

    @Override public void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {Runnable request = new Runnable() {...@Overridepublic void run() {...TransactionListener transactionListener = getCheckListener();...localTransactionState?=?transactionListener.checkLocalTransaction(message);...???????????this.processTransactionState(localTransactionState,group,exception); }private void processTransactionState(...DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);...}};this.checkExecutor.submit(request); }

    精簡代碼邏輯后可以清晰的看到

    • 開啟一個線程來執行回查的邏輯

    • 執行transactionListener的checkLocalTransaction方法來獲取本地事務執行的結果

    RocketMQ Server

    RocketMQ 服務器在收到Client發過來的Commit消息后會

    讀出半消息——>恢復topic等原消息體的信息——>和普通消息一樣再次寫入磁盤——>刪除之前的半消息

    如果是Rollback消息則直接刪除之前的半消息

    到此,整條RocketMQ 事務消息的調用鏈就結束了

    04 思考

    1. 分布式事務等于事務消息嗎?

    兩者并沒有關系,事務消息僅僅保證本地事務和MQ消息發送形成整體的原子性,而投遞到MQ服務器后,消費者是否能一定消費成功是無法保證的。

    2. 源碼設計上有什么亮點嗎?

    通過對整條鏈路源碼的學習理解發現還是有不少亮點的

    • server端回查消息的發送,client端回查消息邏輯的處理,client端commit/rollback消息的提交都是用了異步進行,可以說能異步的地方都用了異步,通過異步+重試的方式保證了在分布式環境中即使短暫的網絡狀況不良好,也不會影響整體邏輯。

    • 引入TransactionListener,真正做到了開閉原則以及依賴倒置原則,面向接口編程。整體擴展性做得非常好,使用者只需要編寫自己的Listener就可以做到事務消息的發送,非常方便

    • TransactionMQProducer通過繼承DefaultMQProducer極大地復用了關于發送消息相關的邏輯

    3. 源碼設計上有什么不足嗎?

    RocketMQ作為一款極其成功的消息中間件,要發現不足不是那么容易了,筆者談幾點看法

    • sendMessageIntransaction等事務相關的方法被劃分在了DefaultMQProducer里面,從內聚的角度來說這是跟事務相關的發送消息方法應該被劃分在TransactionMQProducer。

    • 所有topic的半消息都會寫在topic為RMQ_SYS_TRANS_OP_HALF_TOPIC的半消息隊列里,并且每條半消息,在整個鏈路里會被寫多次,如果并發很大且大部分消息都是事務消息的話,可靠性會存在問題。

    總結

    以上是生活随笔為你收集整理的通过源码告诉你,阿里的RocketMQ事务消息到底牛逼在哪?的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 实拍澡堂美女洗澡av | 北条麻妃av在线播放 | 日韩毛片 | 中文字幕+乱码+中文乱 | 天码人妻一区二区三区在线看 | 怡红院最新网址 | 免费一二区 | a级黄色片网站 | 欧美另类高清videos的特点 | 肥婆大荫蒂欧美另类 | 中文字幕一区二区在线播放 | 交专区videossex非洲 | 在线视频一区二区三区 | 女同av在线播放 | 亚洲综合日韩精品欧美综合区 | 久久久久久不卡 | 中文字幕在线免费 | 又黄又高潮的视频 | se综合| 久久综合av | av免播放器 | 国产亚洲成av人在线观看导航 | 国产乱淫av片免费 | jav中文字幕 | 日韩精品视频在线观看网站 | 亚色av| 国产成人资源 | 高潮网址 | 91美女免费看 | 免费视频黄色 | 蜜臀av中文字幕 | 在线观看日韩一区 | 91精品又粗又猛又爽 | 日本www黄 | 国产精品无码电影在线观看 | 日韩视频免费观看高清完整版 | 国产欧美在线播放 | 色偷偷久久| 蜜桃va | 久久精品免费播放 | 九九热视频精品在线观看 | 又黄又骚的视频 | 亚洲爱| 中文字幕成人一区 | 欧美日韩小视频 | 激情五月色播五月 | 黄色片成人 | 天天干少妇 | 欧美毛片网站 | 日韩高清免费观看 | 国产91在线看| 日韩欧美中文字幕在线播放 | 男女啊啊啊 | 波多野结衣中文字幕一区二区三区 | 可以看的av网站 | 日韩精品网址 | 国产主播自拍av | 一个色在线视频 | 久草国产精品视频 | 香蕉视频国产在线观看 | 中文字幕第5页 | 日韩在线一区二区三区四区 | 天天夜夜草 | 少妇人妻综合久久中文字幕 | 久久久久人妻一区精品 | 91亚洲精品久久久蜜桃 | 日韩精品在线网站 | 超碰视屏| 久久精品一区二区 | 久久久久国产精品无码免费看 | 亚洲色图网址 | 色多多在线观看 | 国产一区二区三区影院 | av网站免费在线看 | av在线视 | 亚洲精品一区二区潘金莲 | 少妇肥臀大白屁股高清 | 性生活视屏 | 国产欧美一区二区三区精品酒店 | 日韩avv| 欧美日本二区 | 日本天天色 | 99精品久久久 | 国产ts变态重口人妖hd | 国产无毛片 | 二色av| 久久久久久久久久艹 | av免费网址在线观看 | 91视频99| 日韩欧洲亚洲AV无码精品 | 亚洲无卡视频 | 亚洲人体视频 | 亚洲大尺度在线观看 | 北京少妇xxxx做受 | 亚洲综合色网 | 色宗合 | 国产日韩欧美视频在线观看 | 香蕉视频成人在线 | 亚洲香蕉在线视频 |