日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Rocketmq学习3——消息发送原理源码浅析

發布時間:2024/1/21 windows 36 coder
生活随笔 收集整理的這篇文章主要介紹了 Rocketmq学习3——消息发送原理源码浅析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一丶概述

RocketMQ 消息發送的原理流程可以分為以下幾個步驟:

1. 創建生產者

在發送消息前,客戶端首先需要創建一個消息生產者(Producer)實例,并設置必要的配置參數,如NameServer地址、生產組名稱、消息發送失敗的重試次數等。

2. 啟動生產者

創建生產者后,需要調用啟動方法來初始化生產者實例。在這個過程中,生產者會與NameServer建立連接,從NameServer獲取到所有Broker的地址信息。

3. 發送消息

消息發送分為同步發送、異步發送和單向發送三種方式:

  • 同步發送(Synchronous): 生產者發送消息后,會在發送線程中等待服務器的響應,直到收到消息發送確認。
  • 異步發送(Asynchronous): 生產者發送消息后,不會等待服務器的響應,而是通過回調接口處理服務器的響應。
  • 單向發送(One-way): 生產者只負責發送消息,不等待服務器響應,也不關心消息是否到達服務器。

無論采用哪種發送方式,消息發送的主要流程如下:

  1. 消息路由: 生產者通過負載均衡算法選擇一個隊列,通常是根據topic和隊列選擇一個Broker的一個隊列來發送消息。
  2. 消息發送: 生產者向選定的Broker發送消息。消息包含了topic、tags、keys、body等信息。
  3. 消息存儲: Broker接收到消息后,會將消息存儲到CommitLog(消息存儲文件)中。如果配置了消息重試或者高可靠性相關的配置,Broker可能會執行額外的消息復制或持久化操作以確保消息的可靠性。
  4. 寫入響應: Broker將消息存儲確認響應返回給生產者。如果是同步發送,生產者會在這一步等待該響應;如果是異步發送,生產者會在回調函數中處理該響應。

本篇,我們就來簡單看下rocketmq從生產者發送消息,學習一下其中優秀的設計!

二丶生產者消息發送

生產者消息發送本質是通過網絡io將消息發送到broker中,通常通過DefaultMQProducer#send(Message)進行簡單的消息發送,如下是其源碼

可看到如果設置了autoBatch并且消息本身不是一個批量消息,那么會調用sendByAccumulator(使用消息累計器進行發送,猜測會累計到內存中然后批量進行發送)

反之會調用sendDirect進行消息發送

1.sendByAccumulator 如何累計消息發送

rocketmq抽象出ProduceAccumulator進行消息的累計發送

ProduceAccumulator會將消息根據Topic和tag進行分組存儲,然后包裝為MessageBatch調用DefaultMQProducer進行發送

2.sendDirect

DefaultMQProducer消息發送會委托給DefaultMQProducerImpl進行發送,這兩個類名稱很像但是DefaultMQProducerImpl不是DefaultMQProducer的實現,二者是不同維度的:

  • DefaultMQProducer是給調用方使用的,相當于門面
  • DefaultMQProducerImpl:實現了MQProducerInner,真正實現消息發送機制

  1. 指定SendCallback:當異步發送消息的時候,可以實現此接口,實現消息發送成功or失敗后的回調
  2. 指定MessageQueue:MessageQueue是由TopicbrokerqueueId組成,一個topic可以分布在多個Broker上(橫向擴展),一個broker上可以由多個queue(多個queue并行消費提升吞吐量),因此通過發送消息指定MessageQueue可以實現消息的局部有序(消費者使用MessageListenerOrderly單線程進行消費)

下面我們來看看消息發送的具體實現,這部分代碼在DefaultMQProducerImpl#sendDefaultImpl

1.獲取路由信息

獲取路由信息,即從生產者向 NameServer 查詢特定 Topic 的路由信息。這個路由信息包括了這個 Topic 有哪些 Broker 持有,以及這些 Broker 上各自的 Queue 數據

  • NameServer 是 RocketMQ 中的一個關鍵組件,起到了服務注冊中心的作用。所有的 Broker 啟動時會向所有的 NameServer 注冊,包括其 IP 地址、端口、存活狀態以及所持有的 Topic 信息。NameServer 會持有整個消息系統的 Broker 服務器列表及其路由信息。
  • 當生產者啟動時,它會根據配置好的 NameServer 地址列表與 NameServer 集群建立連接。
  • 生產者會在本地緩存從 NameServer 獲取到的路由信息,以便快速選擇目標 Queue 進行消息發送。為了確保路由信息的準確性,生產者會定期(如每隔30秒)或在發送消息時發現路由信息不可用時,重新從 NameServer 更新這些信息,并且生產者發送消息的時候根據本地緩存的路由信息選擇一個 Queue 來發送消息。

通過這種方式,RocketMQ 確保生產者能夠及時獲取和更新路由信息,以及將消息發送到正確的 Broker 和 Queue。這個機制也使得 RocketMQ 能夠在 Broker 或隊列變化時動態適應,保證消息傳輸的高可用性和可擴展性。

生產者會優先從ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable中獲取路由,反之使用rpc請求nameServer獲取路由信息

另外在生產者啟動的時候,會觸發MQClientInstance的start,其中會使用juc調度線程池進行路由信息的定期更新(默認30秒一次)。

這里居然沒有使用長輪詢,理論上長輪詢相比于這種周期請求有更好的及時性,rocketmq可能是考慮到

  • 長輪詢的方式需nameServer維護連接狀態,而周期輪詢對于nameServer負擔更小
  • 周期請求可以讓生產者設置周期頻率
  • 流量更加均勻:長輪詢在路由信息發生變化的時候,nameServer需要立馬將變化后的信息發送給hang住的producer,不如周期輪詢來得流量均衡。
  • 大部分情況下,路由信息不會頻繁變化的,定期輪詢可滿足需要,不像配置中心配置變更是比較頻繁的,并且配置中心對于配置變更及時性有比較高的要求。

2.負載均衡的選擇一個MessageQueue

如下,如果是同步發送消息,一般會嘗試3次,在獲取到路由信息后會負載均衡的選擇一個MessageQueue進行發送。

RocketMq支持三種選擇MessageQueue的方式

  • 發送消息的時候,傳入MessageQueueSelector的實現選擇隊列;

  • 未開啟Broker故障延遲機制(sendLatencyFaultEnable:false),會采用默認輪訓機制(默認是此種實現方式)

  • 開啟Broker故障延遲機制(sendLatencyFaultEnable:true),會根據brokerName的可用性選擇隊列發送(當需要順序消息的時候不建議打開,會影響到消息的順序性)

    其中是否可用,是否可達,依賴LatencyFaultTolerance進行實現:

    LatencyFaultTolerance 實現了一個基于延遲的容錯策略。它記錄了每個 Broker 的歷史網絡延遲記錄和可用性狀態,并根據這些信息智能選擇最佳的 Broker 進行消息發送。原理包括以下幾個關鍵點:

    • 延遲記錄:每次發送消息時,LatencyFaultTolerance 都會記錄下發送操作的延遲時間。如果發送成功,那么這次操作的延遲時間就會被記錄下來。
    • 故障切換:如果發送消息時發生超時或異常,LatencyFaultTolerance 會將該 Broker 標記為不可用,并計算一個“不可用時長”。在該時長內,Broker 將不會被選中發送消息。
    • 動態容錯:
      LatencyFaultTolerance 會根據之前記錄的延遲時間,動態計算每個 Broker 的權重,并選擇權重最小(表示網絡狀態最好)的 Broker 進行消息發送。
    • 自動恢復:
      被標記為不可用的 Broker 不是永久性的。隨著時間的推移,Broker 的狀態可以從不可用恢復到可用,這通常是通過“不可用時長”來確定的。一旦超過這個時長,Broker 將重新參與到Broker選擇過程中。
    • Broker選擇:
      生產者在發送消息前會從 LatencyFaultTolerance 中獲取一個推薦的 Broker。選擇過程排除了不可用的 Broker,并考慮了網絡延遲和Broker的歷史表現。

3.消息發送

至此,我們以及選擇了一個MessageQueue接下來就是發送消息了。

在發生之前會從路由信息中獲取發送的地址,這里只會選擇master角色的broker進行發送

接下來會回調一些擴展性的鉤子,如CheckForbiddenHook,SendMessageHook。
然后調用MQClientAPIImpl#sendMessage進行發送,最終調用RemotingClient進行消息發送,RemotingClient是rocketmq對網絡通信的實現

無論是單向,還是異步,還是同步,最終都是使用tcp協議進行發送,這里rocketmq使用了netty提供高效的網絡通信。源碼如下:

netty的部分,不做過多贅述,詳細學習:Netty源碼學習7——netty是如何發送數據的 - Cuzzz - 博客園 (cnblogs.com)

三丶總結

感覺學到了什么,又感覺什么都沒學到

  • NameServer:實現producor,broker,consumer的解耦合,互相不需要感知彼此的村子,本質是一個注冊中心。
  • 路由信息使用定期輪詢,而不是長輪詢
    • 長輪詢的方式需nameServer維護連接狀態,而周期輪詢對于nameServer負擔更小
    • 周期請求可以讓生產者設置周期頻率
    • 流量更加均勻:長輪詢在路由信息發生變化的時候,nameServer需要立馬將變化后的信息發送給hang住的producer,不如周期輪詢來得流量均衡。
    • 大部分情況下,路由信息不會頻繁變化的,定期輪詢可滿足需要,不像配置中心配置變更是比較頻繁的,并且配置中心對于配置變更及時性有比較高的要求。
  • 負載均衡:
    • rocketmq的負載均衡,大多實在客戶端做的,在消息發送中的體現就是,producer自己實現負載均衡,而不是由一個中心化的網關實現,這樣去中心化的設計,利于producer的橫向擴展!
    • 默認情況下使用輪詢,而且使用ThreadLocal記錄輪詢到的index,一定程度上減少大量消息發送時候的鎖競爭
    • LatencyFaultTolerance:基于每一次發送消息的統計信息,如果發送消息時發生超時或異常,LatencyFaultTolerance 會將該 Broker 標記為不可用,并計算一個“不可用時長”。在該時長內,Broker 將不會被選中發送消息

總結

以上是生活随笔為你收集整理的Rocketmq学习3——消息发送原理源码浅析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。