Rocketmq学习3——消息发送原理源码浅析
一丶概述
RocketMQ 消息發送的原理流程可以分為以下幾個步驟:
1. 創建生產者
在發送消息前,客戶端首先需要創建一個消息生產者(Producer)實例,并設置必要的配置參數,如NameServer地址、生產組名稱、消息發送失敗的重試次數等。
2. 啟動生產者
創建生產者后,需要調用啟動方法來初始化生產者實例。在這個過程中,生產者會與NameServer建立連接,從NameServer獲取到所有Broker的地址信息。
3. 發送消息
消息發送分為同步發送、異步發送和單向發送三種方式:
- 同步發送(Synchronous): 生產者發送消息后,會在發送線程中等待服務器的響應,直到收到消息發送確認。
- 異步發送(Asynchronous): 生產者發送消息后,不會等待服務器的響應,而是通過回調接口處理服務器的響應。
- 單向發送(One-way): 生產者只負責發送消息,不等待服務器響應,也不關心消息是否到達服務器。
無論采用哪種發送方式,消息發送的主要流程如下:
- 消息路由: 生產者通過負載均衡算法選擇一個隊列,通常是根據topic和隊列選擇一個Broker的一個隊列來發送消息。
- 消息發送: 生產者向選定的Broker發送消息。消息包含了topic、tags、keys、body等信息。
-
消息存儲: Broker接收到消息后,會將消息存儲到
CommitLog(消息存儲文件)中。如果配置了消息重試或者高可靠性相關的配置,Broker可能會執行額外的消息復制或持久化操作以確保消息的可靠性。 - 寫入響應: Broker將消息存儲確認響應返回給生產者。如果是同步發送,生產者會在這一步等待該響應;如果是異步發送,生產者會在回調函數中處理該響應。
本篇,我們就來簡單看下rocketmq從生產者發送消息,學習一下其中優秀的設計!
二丶生產者消息發送
生產者消息發送本質是通過網絡io將消息發送到broker中,通常通過DefaultMQProducer#send(Message)進行簡單的消息發送,如下是其源碼
可看到如果設置了autoBatch并且消息本身不是一個批量消息,那么會調用sendByAccumulator(使用消息累計器進行發送,猜測會累計到內存中然后批量進行發送)
反之會調用sendDirect進行消息發送
1.sendByAccumulator 如何累計消息發送
rocketmq抽象出ProduceAccumulator進行消息的累計發送
ProduceAccumulator會將消息根據Topic和tag進行分組存儲,然后包裝為MessageBatch調用DefaultMQProducer進行發送
2.sendDirect
DefaultMQProducer消息發送會委托給DefaultMQProducerImpl進行發送,這兩個類名稱很像但是DefaultMQProducerImpl不是DefaultMQProducer的實現,二者是不同維度的:
- DefaultMQProducer是給調用方使用的,相當于門面
- DefaultMQProducerImpl:實現了MQProducerInner,真正實現消息發送機制
- 指定SendCallback:當異步發送消息的時候,可以實現此接口,實現消息發送成功or失敗后的回調
- 指定MessageQueue:MessageQueue是由
Topic,broker,queueId組成,一個topic可以分布在多個Broker上(橫向擴展),一個broker上可以由多個queue(多個queue并行消費提升吞吐量),因此通過發送消息指定MessageQueue可以實現消息的局部有序(消費者使用MessageListenerOrderly單線程進行消費)
下面我們來看看消息發送的具體實現,這部分代碼在DefaultMQProducerImpl#sendDefaultImpl
1.獲取路由信息
獲取路由信息,即從生產者向 NameServer 查詢特定 Topic 的路由信息。這個路由信息包括了這個 Topic 有哪些 Broker 持有,以及這些 Broker 上各自的 Queue 數據
- NameServer 是 RocketMQ 中的一個關鍵組件,起到了服務注冊中心的作用。所有的 Broker 啟動時會向所有的 NameServer 注冊,包括其 IP 地址、端口、存活狀態以及所持有的 Topic 信息。NameServer 會持有整個消息系統的 Broker 服務器列表及其路由信息。
- 當生產者啟動時,它會根據配置好的 NameServer 地址列表與 NameServer 集群建立連接。
- 生產者會在本地緩存從 NameServer 獲取到的路由信息,以便快速選擇目標 Queue 進行消息發送。為了確保路由信息的準確性,生產者會定期(如每隔30秒)或在發送消息時發現路由信息不可用時,重新從 NameServer 更新這些信息,并且生產者發送消息的時候根據本地緩存的路由信息選擇一個 Queue 來發送消息。
通過這種方式,RocketMQ 確保生產者能夠及時獲取和更新路由信息,以及將消息發送到正確的 Broker 和 Queue。這個機制也使得 RocketMQ 能夠在 Broker 或隊列變化時動態適應,保證消息傳輸的高可用性和可擴展性。
生產者會優先從ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable中獲取路由,反之使用rpc請求nameServer獲取路由信息
另外在生產者啟動的時候,會觸發MQClientInstance的start,其中會使用juc調度線程池進行路由信息的定期更新(默認30秒一次)。
這里居然沒有使用長輪詢,理論上長輪詢相比于這種周期請求有更好的及時性,rocketmq可能是考慮到
- 長輪詢的方式需nameServer維護連接狀態,而周期輪詢對于nameServer負擔更小
- 周期請求可以讓生產者設置周期頻率
- 流量更加均勻:長輪詢在路由信息發生變化的時候,nameServer需要立馬將變化后的信息發送給hang住的producer,不如周期輪詢來得流量均衡。
- 大部分情況下,路由信息不會頻繁變化的,定期輪詢可滿足需要,不像配置中心配置變更是比較頻繁的,并且配置中心對于配置變更及時性有比較高的要求。
2.負載均衡的選擇一個MessageQueue
如下,如果是同步發送消息,一般會嘗試3次,在獲取到路由信息后會負載均衡的選擇一個MessageQueue進行發送。
RocketMq支持三種選擇MessageQueue的方式
-
發送消息的時候,傳入MessageQueueSelector的實現選擇隊列;
-
未開啟Broker故障延遲機制(sendLatencyFaultEnable:false),會采用默認輪訓機制(默認是此種實現方式)
-
開啟Broker故障延遲機制(sendLatencyFaultEnable:true),會根據brokerName的可用性選擇隊列發送(當需要順序消息的時候不建議打開,會影響到消息的順序性)
其中是否可用,是否可達,依賴LatencyFaultTolerance進行實現:
LatencyFaultTolerance 實現了一個基于延遲的容錯策略。它記錄了每個 Broker 的歷史網絡延遲記錄和可用性狀態,并根據這些信息智能選擇最佳的 Broker 進行消息發送。原理包括以下幾個關鍵點:
- 延遲記錄:每次發送消息時,LatencyFaultTolerance 都會記錄下發送操作的延遲時間。如果發送成功,那么這次操作的延遲時間就會被記錄下來。
- 故障切換:如果發送消息時發生超時或異常,LatencyFaultTolerance 會將該 Broker 標記為不可用,并計算一個“不可用時長”。在該時長內,Broker 將不會被選中發送消息。
- 動態容錯:
LatencyFaultTolerance 會根據之前記錄的延遲時間,動態計算每個 Broker 的權重,并選擇權重最小(表示網絡狀態最好)的 Broker 進行消息發送。 - 自動恢復:
被標記為不可用的 Broker 不是永久性的。隨著時間的推移,Broker 的狀態可以從不可用恢復到可用,這通常是通過“不可用時長”來確定的。一旦超過這個時長,Broker 將重新參與到Broker選擇過程中。 - Broker選擇:
生產者在發送消息前會從 LatencyFaultTolerance 中獲取一個推薦的 Broker。選擇過程排除了不可用的 Broker,并考慮了網絡延遲和Broker的歷史表現。
3.消息發送
至此,我們以及選擇了一個MessageQueue接下來就是發送消息了。
在發生之前會從路由信息中獲取發送的地址,這里只會選擇master角色的broker進行發送
接下來會回調一些擴展性的鉤子,如CheckForbiddenHook,SendMessageHook。
然后調用MQClientAPIImpl#sendMessage進行發送,最終調用RemotingClient進行消息發送,RemotingClient是rocketmq對網絡通信的實現
無論是單向,還是異步,還是同步,最終都是使用tcp協議進行發送,這里rocketmq使用了netty提供高效的網絡通信。源碼如下:
netty的部分,不做過多贅述,詳細學習:Netty源碼學習7——netty是如何發送數據的 - Cuzzz - 博客園 (cnblogs.com)
三丶總結
感覺學到了什么,又感覺什么都沒學到
- NameServer:實現producor,broker,consumer的解耦合,互相不需要感知彼此的村子,本質是一個注冊中心。
- 路由信息使用定期輪詢,而不是長輪詢
- 長輪詢的方式需nameServer維護連接狀態,而周期輪詢對于nameServer負擔更小
- 周期請求可以讓生產者設置周期頻率
- 流量更加均勻:長輪詢在路由信息發生變化的時候,nameServer需要立馬將變化后的信息發送給hang住的producer,不如周期輪詢來得流量均衡。
- 大部分情況下,路由信息不會頻繁變化的,定期輪詢可滿足需要,不像配置中心配置變更是比較頻繁的,并且配置中心對于配置變更及時性有比較高的要求。
- 負載均衡:
- rocketmq的負載均衡,大多實在客戶端做的,在消息發送中的體現就是,producer自己實現負載均衡,而不是由一個中心化的網關實現,這樣去中心化的設計,利于producer的橫向擴展!
- 默認情況下使用輪詢,而且使用ThreadLocal記錄輪詢到的index,一定程度上減少大量消息發送時候的鎖競爭
- LatencyFaultTolerance:基于每一次發送消息的統計信息,如果發送消息時發生超時或異常,LatencyFaultTolerance 會將該 Broker 標記為不可用,并計算一個“不可用時長”。在該時長內,Broker 將不會被選中發送消息
總結
以上是生活随笔為你收集整理的Rocketmq学习3——消息发送原理源码浅析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: xshell配置隧道转移规则
- 下一篇: 新接手一个业务系统,我是这么熟悉的