技术干货 | 源码解析 Github 上 14.1k Star 的 RocketMQ
簡(jiǎn)介:?站在發(fā)送方視角,通過(guò)源碼,來(lái)分析在事務(wù)消息發(fā)送中 RocketMQ 是如何工作的。
前言
Apache RocketMQ 作為廣為人知的開(kāi)源消息中間件,誕生于阿里巴巴,于 2016 年捐贈(zèng)給了 Apache。從 RocketMQ 4.0 到如今最新的 v4.7.1,不論是在阿里巴巴內(nèi)部還是外部社區(qū),都贏得了廣泛的關(guān)注和好評(píng)。
本文將站在發(fā)送方視角,通過(guò)閱讀 RocketMQ Producer 源碼,來(lái)分析在事務(wù)消息發(fā)送中 RocketMQ 是如何工作的。
需要說(shuō)明的是,本文所貼代碼,均來(lái)自 4.7.1 版本的 RocketMQ 源碼。本文中所討論的發(fā)送,僅指從 Producer 發(fā)送到 Broker 的過(guò)程,并不包含 Broker 將消息投遞到 Consumer 的過(guò)程。
宏觀概覽
RocketMQ 事務(wù)消息發(fā)送流程:
結(jié)合源碼來(lái)看,RocketMQ 的事務(wù)消息 TransactionMQProducer 的 sendMessageInTransaction 方法,實(shí)際調(diào)用了 DefaultMQProducerImpl 的 sendMessageInTransaction 方法。我們進(jìn)入 sendMessageInTransaction 方法,整個(gè)事務(wù)消息的發(fā)送流程清晰可見(jiàn)。
首先,做發(fā)送前檢查,并填入必要參數(shù),包括設(shè) prepare 事務(wù)消息。
源碼清單-1
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// ignore DelayTimeLevel parameterif (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());進(jìn)入發(fā)送處理流程:
源碼清單-2
try {sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}根據(jù) broker 返回的處理結(jié)果決策本地事務(wù)是否執(zhí)行,半消息發(fā)送成功則開(kāi)始本地事務(wù)執(zhí)行:
源碼清單-3
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) { log.debug("Used new transaction API");localTransactionState = transactionListener.executeLocalTransaction(msg, arg); }if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE: // 當(dāng)備broker狀態(tài)不可用時(shí),半消息要回滾,不執(zhí)行本地事務(wù)localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}本地事務(wù)執(zhí)行結(jié)束,根據(jù)本地事務(wù)狀態(tài)進(jìn)行二階段處理:
源碼清單-4
try {this.endTransaction(sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}// 組裝發(fā)送結(jié)果// ...return transactionSendResult; }接下來(lái),我們深入每個(gè)階段代碼分析。
深扒內(nèi)幕
Ⅰ階段發(fā)送
重點(diǎn)分析 send 方法。進(jìn)入 send 方法后,我們發(fā)現(xiàn),RocketMQ 的事務(wù)消息的一階段,使用了 SYNC 同步模式:
源碼清單-5
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }這一點(diǎn)很容易理解,畢竟事務(wù)消息是要根據(jù)一階段發(fā)送結(jié)果來(lái)決定要不要執(zhí)行本地事務(wù)的,所以一定要阻塞等待 broker 的 ack。
我們進(jìn)入 DefaultMQProducerImpl.java 中去看 sendDefaultImpl 方法的實(shí)現(xiàn),通過(guò)讀這個(gè)方法的代碼,來(lái)嘗試了解在事務(wù)消息的一階段發(fā)送過(guò)程中 producer 的行為。
值得注意的是,這個(gè)方法并非為事務(wù)消息定制,甚至不是為 SYNC 同步模式定制的,因此讀懂了這段代碼,基本可以對(duì) RocketMQ 的消息發(fā)送機(jī)制有了一個(gè)較為全面的認(rèn)識(shí)。
這段代碼邏輯非常通暢,不忍切片。為了節(jié)省篇幅,將代碼中較為繁雜但信息量不大的部分以注釋代替,盡可能保留流程的完整性。個(gè)人認(rèn)為較為重要或是容易被忽略的部分,以注釋標(biāo)出,后文還有部分細(xì)節(jié)的詳細(xì)解讀。
源碼清單-6
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();// 一、消息有效性校驗(yàn)。見(jiàn)后文Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 獲取當(dāng)前topic的發(fā)送路由信息,主要是要broker,如果沒(méi)找到則從namesrv獲取TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 二、發(fā)送重試機(jī)制。見(jiàn)后文int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {// 第一次發(fā)送是mq == null, 之后都是有broker信息的String lastBrokerName = null == mq ? null : mq.getBrokerName();// 三、rocketmq發(fā)送消息時(shí)如何選擇隊(duì)列?——broker異常規(guī)避機(jī)制 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 發(fā)送核心代碼sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// rocketmq 選擇 broker 時(shí)的規(guī)避機(jī)制,開(kāi)啟 sendLatencyFaultEnable == true 才生效this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {// 四、RocketMQ的三種CommunicationMode。見(jiàn)后文case ASYNC: // 異步模式return null;case ONEWAY: // 單向模式return null;case SYNC: // 同步模式if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {// ...// 自動(dòng)重試} catch (MQClientException e) {// ...// 自動(dòng)重試} catch (MQBrokerException e) {// ...// 僅返回碼==NOT_IN_CURRENT_UNIT==205 時(shí)自動(dòng)重試// 其他情況不重試,拋異常} catch (InterruptedException e) {// ...// 不重試,拋異常}} else {break;}}if (sendResult != null) {return sendResult;}// 組裝返回的info信息,最后以MQClientException拋出// ... ...// 超時(shí)場(chǎng)景拋RemotingTooMuchRequestExceptionif (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}// 填充MQClientException異常信息// ...}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }一、消息有效性校驗(yàn)
源碼清單-7
Validators.checkMessage(msg, this.defaultMQProducer);在此方法中校驗(yàn)消息的有效性,包括對(duì) topic 和消息體的校驗(yàn)。topic 的命名必須符合規(guī)范,且避免使用內(nèi)置的系統(tǒng)消息 TOPIC。消息體長(zhǎng)度 > 0 && 消息體長(zhǎng)度 <= 102410244 = 4M 。
源碼清單-8
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)throws MQClientException {if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");}// topicValidators.checkTopic(msg.getTopic());Validators.isNotAllowedSendTopic(msg.getTopic());// bodyif (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");}if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");}if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());} }二、發(fā)送重試機(jī)制
Producer 在消息發(fā)送不成功時(shí),會(huì)自動(dòng)重試,最多發(fā)送次數(shù) = retryTimesWhenSendFailed + 1 = 3 次 。
值得注意的是,并非所有異常情況都會(huì)重試,從以上源碼中可以提取到的信息告訴我們,在以下三種情況下,會(huì)自動(dòng)重試:
- 發(fā)生 RemotingException,MQClientException 兩種異常之一時(shí)
- 發(fā)生 MQBrokerException 異常,且 ResponseCode 是 NOT_IN_CURRENT_UNIT = 205 時(shí)
- SYNC 模式下,未發(fā)生異常且發(fā)送結(jié)果狀態(tài)非 SEND_OK
在每次發(fā)送消息之前,會(huì)先檢查是否在前面這兩步就已經(jīng)耗時(shí)超長(zhǎng)(超時(shí)時(shí)長(zhǎng)默認(rèn) 3000ms),若是,則不再繼續(xù)發(fā)送并且直接返回超時(shí),不再重試。這里說(shuō)明了 2 個(gè)問(wèn)題:
- producer 內(nèi)部自動(dòng)重試對(duì)業(yè)務(wù)應(yīng)用而言是無(wú)感知的,應(yīng)用看到的發(fā)送耗時(shí)是包含所有重試的耗時(shí)在內(nèi)的;
- 一旦超時(shí)意味著本次消息發(fā)送已經(jīng)以失敗告終,原因是超時(shí)。這個(gè)信息最后會(huì)以 RemotingTooMuchRequestException 的形式拋出。
這里需要指出的是,在 RocketMQ 官方文檔中指出,發(fā)送超時(shí)時(shí)長(zhǎng)是 10s,即 10000ms,網(wǎng)上許多人對(duì) rocketMQ 的超時(shí)時(shí)間解讀也認(rèn)為是 10s。然而代碼中卻明明白白寫著 3000ms,最終我 debug 之后確認(rèn),默認(rèn)超時(shí)時(shí)間確實(shí)是 3000ms。
三、broker 的異常規(guī)避機(jī)制
源碼清單-9
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);這行代碼是發(fā)送前選擇 queue 的過(guò)程。
這里涉及 RocketMQ 消息發(fā)送高可用的的一個(gè)核心機(jī)制,latencyFaultTolerance。這個(gè)機(jī)制是 Producer 負(fù)載均衡的一部分,通過(guò) sendLatencyFaultEnable 的值來(lái)控制,默認(rèn)是 false 關(guān)閉狀態(tài),不啟動(dòng) broker 故障延遲機(jī)制,值為 true 時(shí)啟用 broker 故障延遲機(jī)制,可由 Producer 主動(dòng)打開(kāi)。
選擇隊(duì)列時(shí),開(kāi)啟異常規(guī)避機(jī)制,則根據(jù) broker 的工作狀態(tài)避免選擇當(dāng)前狀態(tài)不佳的 broker 代理,不健康的 broker 會(huì)在一段時(shí)間內(nèi)被規(guī)避,不開(kāi)啟異常規(guī)避機(jī)制時(shí),則按順序選取下一個(gè)隊(duì)列,但在重試場(chǎng)景下會(huì)盡量選擇不同于上次發(fā)送 broker 的 queue。每次消息發(fā)送都會(huì)通過(guò) updateFaultItem 方法來(lái)維護(hù) broker 的狀態(tài)信息。
源碼清單-10
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {// 計(jì)算延遲多久,isolation表示是否需要隔離該broker,若是,則從30s往前找第一個(gè)比30s小的延遲值,再按下標(biāo)判斷規(guī)避的周期,若30s,則是10min規(guī)避;// 否則,按上一次發(fā)送耗時(shí)來(lái)決定規(guī)避時(shí)長(zhǎng);long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);} }深入到 selectOneMessageQueue 方法內(nèi)部一探究竟:
源碼清單-11
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {// 開(kāi)啟異常規(guī)避try {int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 按順序取下一個(gè)message queue作為發(fā)送的queueMessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 當(dāng)前queue所在的broker可用,且與上一個(gè)queue的broker相同,// 或者第一次發(fā)送,則使用這個(gè)queueif (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}// 不開(kāi)啟異常規(guī)避,則隨機(jī)自增選擇Queuereturn tpInfo.selectOneMessageQueue(lastBrokerName); }四、RocketMQ 的三種 CommunicationMode
源碼清單-12
public enum CommunicationMode {SYNC,ASYNC,ONEWAY, }以上三種模式指的都是消息從發(fā)送方到達(dá) broker 的階段,不包含 broker 將消息投遞給訂閱方的過(guò)程。三種模式的發(fā)送方式的差異:
單向模式:ONEWAY。消息發(fā)送方只管發(fā)送,并不關(guān)心 broker 處理的結(jié)果如何。這種模式下,由于處理流程少,發(fā)送耗時(shí)非常小,吞吐量大,但不能保證消息可靠不丟,常用于流量巨大但不重要的消息場(chǎng)景,例如心跳發(fā)送等。
異步模式:ASYNC。消息發(fā)送方發(fā)送消息到 broker 后,無(wú)需等待 broker 處理,拿到的是 null 的返回值,而由一個(gè)異步的線程來(lái)做消息處理,處理完成后以回調(diào)的形式告訴發(fā)送方發(fā)送結(jié)果。異步處理時(shí)如有異常,返回發(fā)送方失敗結(jié)果之前,會(huì)經(jīng)過(guò)內(nèi)部重試(默認(rèn) 3 次,發(fā)送方不感知)。這種模式下,發(fā)送方等待時(shí)長(zhǎng)較小,吞吐量較大,消息可靠,用于流量大但重要的消息場(chǎng)景。
同步模式:SYNC。消息發(fā)送方需等待 broker 處理完成并明確返回成功或失敗,在消息發(fā)送方拿到消息發(fā)送失敗的結(jié)果之前,也會(huì)經(jīng)歷過(guò)內(nèi)部重試(默認(rèn) 3 次,發(fā)送方不感知)這種模式下,發(fā)送方會(huì)阻塞等待消息處理結(jié)果,等待時(shí)長(zhǎng)較長(zhǎng),消息可靠,用于流量不大但重要的消息場(chǎng)景。需要強(qiáng)調(diào)的是,事務(wù)消息的一階段半事務(wù)消息的處理是同步模式。
在 sendKernelImpl 方法中也可以看到具體的實(shí)現(xiàn)差異。ONEWAY 模式最為簡(jiǎn)單,不做任何處理。負(fù)責(zé)發(fā)送的 sendMessage 方法參數(shù)中,相比同步模式,異步模式多了回調(diào)方法、包含 topic 發(fā)送路由元信息的 topicPublishInfo、包含發(fā)送 broker 信息的 instance、包含發(fā)送隊(duì)列信息的 producer、重試次數(shù)。另外,異步模式下,會(huì)對(duì)有壓縮的消息先做 copy。
源碼清單-13
switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}官方文檔中有這樣一張圖,十分清晰的描述了異步通信的詳細(xì)過(guò)程:
Ⅱ 階段發(fā)送
源碼清單-3 體現(xiàn)了本地事務(wù)的執(zhí)行,localTransactionState 將本地事務(wù)執(zhí)行結(jié)果與事務(wù)消息二階段的發(fā)送關(guān)聯(lián)起來(lái)。
值得注意的是,如果一階段的發(fā)送結(jié)果是 SLAVENOTAVAILABLE,即便 broker 不可用時(shí),也會(huì)將 localTransactionState 置為 Rollback,此時(shí)將不會(huì)執(zhí)行本地事務(wù)。之后由 endTransaction 方法負(fù)責(zé)二階段提交,見(jiàn)源碼清單-4。具體到 endTransaction 的實(shí)現(xiàn):
源碼清單-14
public void endTransaction(final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id;if (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}String transactionId = sendResult.getTransactionId();final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {case COMMIT_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 采用oneway的方式發(fā)送二階段消息this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout()); }在二階段發(fā)送時(shí),之所以用 oneway 的方式發(fā)送,個(gè)人理解這正是因?yàn)槭聞?wù)消息有一個(gè)特殊的可靠機(jī)制——回查。
消息回查
當(dāng) Broker 經(jīng)過(guò)了一個(gè)特定的時(shí)間,發(fā)現(xiàn)依然沒(méi)有得到事務(wù)消息的二階段是否要提交或者回滾的確切信息,Broker 不知道 Producer 發(fā)生了什么情況(可能 producer 掛了,也可能 producer 發(fā)了 commit 但網(wǎng)絡(luò)抖動(dòng)丟了,也可能……于是主動(dòng)發(fā)起回查。
事務(wù)消息的回查機(jī)制,更多的是在 broker 端的體現(xiàn)。RocketMQ 的 broker 以 Half 消息、Op 消息、真實(shí)消息三個(gè)不同的 topic 來(lái)將不同發(fā)送階段的事務(wù)消息進(jìn)行了隔離,使得 Consumer 只能看到最終確認(rèn) commit 需要投遞出去的消息。其中詳細(xì)的實(shí)現(xiàn)邏輯在本文中暫不多贅述,后續(xù)可另開(kāi)一篇專門來(lái)從 Broker 視角來(lái)解讀。
回到 Producer 的視角,當(dāng)收到了 Broker 的回查請(qǐng)求,Producer 將根據(jù)消息檢查本地事務(wù)狀態(tài),根據(jù)結(jié)果決定提交或回滾,這就要求 Producer 必須指定回查實(shí)現(xiàn),以備不時(shí)之需。當(dāng)然,正常情況下,并不推薦主動(dòng)發(fā)送 UNKNOW 狀態(tài),這個(gè)狀態(tài)毫無(wú)疑問(wèn)會(huì)給 broker 帶來(lái)額外回查開(kāi)銷,只在出現(xiàn)不可預(yù)知的異常情況時(shí)才啟動(dòng)回查機(jī)制,是一種比較合理的選擇。
另外,4.7.1 版本的事務(wù)回查并非無(wú)限回查,而是最多回查 15 次:
源碼清單-15
/*** The maximum number of times the message was checked, if exceed this value, this message will be discarded.*/ @ImportantField private int transactionCheckMax = 15;附錄
官方給出 Producer 的默認(rèn)參數(shù)如下(其中超時(shí)時(shí)長(zhǎng)的參數(shù),在前文中也已經(jīng)提到,debug 的結(jié)果是默認(rèn) 3000ms,并非 10000ms):
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的技术干货 | 源码解析 Github 上 14.1k Star 的 RocketMQ的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 新型DDoS来袭 | 基于STUN协议的
- 下一篇: mPaaS小程序技术架构深度解析