日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

525、Java工程师的进阶之路 -【 RocketMQ (二)】 2022.01.06

發布時間:2024/3/13 java 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 525、Java工程师的进阶之路 -【 RocketMQ (二)】 2022.01.06 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

    • 1. RocketMQ 設計目的
      • 1.1. 發布/訂閱
      • 1.2. 消息優先級
      • 1.3. 消息順序
      • 1.4. 消息過濾
      • 1.5. 消息持久化
      • 1.6. 消息可靠性
      • 1.7. 消息實時性
      • 1.8. 保證至少一次
      • 1.9. 保證只有一次
      • 1.10. Broker的Buffer溢出
      • 1.11. 回溯消費
      • 1.12. 消息堆積
      • 1.13. 分布式事務
      • 1.14. 定時消息
      • 1.15. 消息重試
    • 2. RocketMQ 消費模型
    • 3. RocketMQ 網絡模型
    • 4. RocketMQ 存儲模型
    • 5、RocketMQ 高可用性
    • 6. RocketMQ 定時/延時消息
    • 7. RocketMQ 事務消息
    • 8、參考鏈接

當今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的 Kafka,阿里巴巴自主開發 RocketMQ 等。消息隊列已經逐漸成為企業IT系統內部通信的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一。

1. RocketMQ 設計目的

1.1. 發布/訂閱

發布訂閱是消息中間件的最基本功能,也是相對于傳統RPC通信而言。

1.2. 消息優先級

規范中描述的優先級是指在一個消息隊列中,每條消息都有不同的優先級,一般用整數來描述,優先級高的消息先投遞,如果消息完全在一個內存隊列中,那么在投遞前可以按照優先級排序,令優先級高的先投遞。
由于RocketMQ所有消息都是持久化的,所以如果按照優先級來排序,開銷會非常大,因此RocketMQ沒有特意支持消息優先級,但是可以通過變通的方式實現類似功能,即單獨配置一個優先級高的隊列,和一個普通優先級的隊列, 將不同優先級發送到不同隊列即可。
對于優先級問題,可以歸納為2類:

  • 只要達到優先級目的即可,不是嚴格意義上的優先級,通常將優先級劃分為高、中、低,或者再多幾個級別。每個優先級可以用不同的topic表示,發消息時,指定不同的topic來表示優先級,這種方式可以解決絕大部分的優先級問題,但是對業務的優先級精確性做了妥協。

  • 嚴格的優先級,優先級用整數表示,例如0 ~ 65535,這種優先級問題一般使用不同topic解決就非常不合適。如果要讓MQ解決此問題,會對MQ的性能造成非常大的影響。這里要確保一點,業務上是否確實需要這種嚴格的優先級,如果將優先級壓縮成幾個,對業務的影響有多大?

  • 1.3. 消息順序

    消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了3條消息,分別是訂單創建,訂單付款,訂單完成。消費時,要按照這個順序消費才能有意義。但是同時訂單之間是可以并行消費的。

    RocketMQ可以嚴格的保證消息有序。

    1.4. 消息過濾

    Broker端消息過濾
    在Broker中,按照Consumer的要求做過濾,優點是減少了對于Consumer無用消息的網絡傳輸。
    缺點是增加了Broker的負擔,實現相對復雜。

  • 淘寶Notify支持多種過濾方式,包含直接按照消息類型過濾,靈活的語法表達式過濾,幾乎可以滿足最苛刻的過濾需求。
  • 淘寶RocketMQ支持按照簡單的Message Tag過濾,也支持按照Message Header、body進行過濾。
  • CORBA Notification規范中也支持靈活的語法表達式過濾。
  • Consumer端消息過濾
    這種過濾方式可由應用完全自定義實現,但是缺點是很多無用的消息要傳輸到Consumer端。

    1.5. 消息持久化

    消息中間件通常采用的幾種持久化方式:

  • 持久化到數據庫,例如Mysql。
  • 持久化到KV存儲,例如levelDB、伯克利DB等KV存儲系統。
  • 文件記錄形式持久化,例如Kafka,RocketMQ
  • 對內存數據做一個持久化鏡像,例如beanstalkd,VisiNotify
  • (1)、(2)、(3)三種持久化方式都具有將內存隊列Buffer進行擴展的能力,(4)只是一個內存的鏡像,作用是當Broker掛掉重啟后仍然能將之前內存的數據恢復出來。
  • JMS與CORBA Notification規范沒有明確說明如何持久化,但是持久化部分的性能直接決定了整個消息中間件的性能。

    RocketMQ充分利用Linux文件系統內存cache來提高性能。

    1.6. 消息可靠性

    影響消息可靠性的幾種情況:

  • Broker正常關閉
  • Broker異常Crash
  • OS Crash
  • 機器掉電,但是能立即恢復供電情況。
  • 機器無法開機(可能是cpu、主板、內存等關鍵設備損壞)
  • 磁盤設備損壞。
  • (1)、(2)、(3)、(4)四種情況都屬于硬件資源可立即恢復情況,RocketMQ在這四種情況下能保證消息不丟,或者丟失少量數據(依賴刷盤方式是同步還是異步)。

    (5)、(6)屬于單點故障,且無法恢復,一旦發生,在此單點上的消息全部丟失。RocketMQ在這兩種情況下,通過異步復制,可保證99%的消息不丟,但是仍然會有極少量的消息可能丟失。通過同步雙寫技術可以完全避免單點,同步雙寫勢必會影響性能,適合對消息可靠性要求極高的場合,例如與Money相關的應用。

    RocketMQ從3.0版本開始支持同步雙寫。

    1.7. 消息實時性

    在消息不堆積情況下,消息到達Broker后,能立刻到達Consumer。

    RocketMQ使用長輪詢Pull方式,可保證消息非常實時,消息實時性不低于Push。

    1.8. 保證至少一次

    是指每個消息必須投遞一次。

    RocketMQ Consumer先pull消息到本地,消費完成后,才向服務器返回ack,如果沒有消費一定不會ack消息,所以RocketMQ可以很好的支持此特性。

    1.9. 保證只有一次

  • 發送消息階段,不允許發送重復的消息。
  • 消費消息階段,不允許消費重復的消息。
  • 只有以上兩個條件都滿足情況下,才能認為消息是“Exactly Only Once”,而要實現以上兩點,在分布式系統環境下,不可避免要產生巨大的開銷。所以RocketMQ為了追求高性能,并不保證此特性,要求在業務上進行去重,也就是說消費消息要做到冪等性。

    RocketMQ雖然不能嚴格保證不重復,但是正常情況下很少會出現重復發送、消費情況,只有網絡異常,Consumer啟停等異常情況下會出現消息重復。

    1.10. Broker的Buffer溢出

    Broker的Buffer通常指的是Broker中一個隊列的內存Buffer大小,這類Buffer通常大小有限,如果Buffer滿了以后怎么辦?
    下面是CORBA Notification規范中處理方式:

  • 拒絕新來的消息,向Producer返回RejectNewEvents錯誤碼。
  • 按照特定策略丟棄已有消息:
    • AnyOrder - Any event may be discarded on overflow. This is the default setting for this property.
    • FifoOrder - The first event received will be the first discarded.
    • LifoOrder - The last event received will be the first discarded.
    • PriorityOrder - Events should be discarded in priority order, such that lower priority events will be discarded before higher priority events.
    • DeadlineOrder - Events should be discarded in the order of shortest expiry deadline first.

    RocketMQ沒有內存Buffer概念,RocketMQ的隊列都是持久化磁盤,數據定期清除。

    對于此問題的解決思路,RocketMQ同其他MQ有非常顯著的區別,RocketMQ的內存Buffer抽象成一個無限長度的隊列,不管有多少數據進來都能裝得下,這個無限是有前提的,Broker會定期刪除過期的數據,例如Broker只保存3天的消息,那么這個Buffer雖然長度無限,但是3天前的數據會被從隊尾刪除。

    此問題的本質原因是網絡調用存在不確定性,即既不成功也不失敗的第三種狀態,所以才產生了消息重復性問題。

    1.11. 回溯消費

    回溯消費是指Consumer已經消費成功的消息,由于業務上需求需要重新消費,要支持此功能,Broker在向Consumer投遞成功消息后,消息仍然需要保留。并且重新消費一般是按照時間維度,例如由于Consumer系統故障,恢復后需要重新消費1小時前的數據,那么Broker要提供一種機制,可以按照時間維度來回退消費進度。

    RocketMQ支持按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向后回溯。

    1.12. 消息堆積

    消息中間件的主要功能是異步解耦,還有個重要功能是擋住前端的數據洪峰,保證后端系統的穩定性,這就要求消息中間件具有一定的消息堆積能力,消息堆積分以下兩種情況:

  • 消息堆積在內存Buffer,一旦超過內存Buffer,可以根據一定的丟棄策略來丟棄消息,如CORBA Notification規范中描述。適合能容忍丟棄消息的業務,這種情況消息的堆積能力主要在于內存Buffer大小,而且消息堆積后,性能下降不會太大,因為內存中數據多少對于對外提供的訪問能力影響有限。
  • 消息堆積到持久化存儲系統中,例如DB,KV存儲,文件記錄形式。 當消息不能在內存Cache命中時,要不可避免的訪問磁盤,會產生大量讀IO,讀IO的吞吐量直接決定了消息堆積后的訪問能力。
  • 評估消息堆積能力主要有以下四點:

  • 消息能堆積多少條,多少字節?即消息的堆積容量。
  • 消息堆積后,發消息的吞吐量大小,是否會受堆積影響?
  • 消息堆積后,正常消費的Consumer是否會受影響?
  • 消息堆積后,訪問堆積在磁盤的消息時,吞吐量有多大?
  • 1.13. 分布式事務

    已知的幾個分布式事務規范,如XA,JTA等。其中XA規范被各大數據庫廠商廣泛支持,如Oracle,Mysql等。其中XA的TM實現佼佼者如Oracle Tuxedo,在金融、電信等領域被廣泛應用。
    分布式事務涉及到兩階段提交問題,在數據存儲方面的方面必然需要KV存儲的支持,因為第二階段的提交回滾需要修改消息狀態,一定涉及到根據Key去查找Message的動作。RocketMQ在第二階段繞過了根據Key去查找Message的問題,采用第一階段發送Prepared消息時,拿到了消息的Offset,第二階段通過Offset去訪問消息,并修改狀態,Offset就是數據的地址。
    RocketMQ這種實現事務方式,沒有通過KV存儲做,而是通過Offset方式,存在一個顯著缺陷,即通過Offset更改數據,會令系統的臟頁過多,需要特別關注。

    1.14. 定時消息

    定時消息是指消息發到Broker后,不能立刻被Consumer消費,要到特定的時間點或者等待特定的時間后才能被消費。
    如果要支持任意的時間精度,在Broker層面,必須要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的產生巨大性能開銷。
    RocketMQ支持定時消息,但是不支持任意時間精度,支持特定的level,例如定時5s,10s,1m等。

    1.15. 消息重試

    Consumer消費消息失敗后,要提供一種重試機制,令消息再消費一次。

    Consumer消費消息失敗通常可以認為有以下幾種情況:

  • 由于消息本身的原因,例如反序列化失敗,消息數據本身無法處理(例如話費充值,當前消息的手機號被注銷,無法充值)等。這種錯誤通常需要跳過這條消息,再消費其他消息,而這條失敗的消息即使立刻重試消費,99%也不成功,所以最好提供一種定時重試機制,即過10s秒后再重試。
  • 由于依賴的下游應用服務不可用,例如db連接不可用,外系統網絡不可達等。遇到這種錯誤,即使跳過當前失敗的消息,消費其他消息同樣也會報錯。這種情況建議應用sleep 30s,再消費下一條消息,這樣可以減輕Broker重試消息的壓力。
  • 2. RocketMQ 消費模型

    一般來說消息隊列的消費模型分為兩種,基于推送的消息 (push) 模型和基于拉取 (poll) 的消息模型。
    基于推送模型的消息系統,由消息代理記錄消費狀態。消息代理將消息推送到消費者后,標記這條消息為已經被消費,但是這種方式無法很好地保證消費的處理語義。比如當我們把已經把消息發送給消費者之后,由于消費進程掛掉或者由于網絡原因沒有收到這條消息,如果我們在消費代理將其標記為已消費,這個消息就永久丟失了。如果我們利用生產者收到消息后回復這種方法,消息代理需要記錄消費狀態,這種不可取。
    用過RocketMQ的同學肯定不禁會想到,在RocketMQ中不是提供了兩種消費者嗎?

    2.1 MQPullConsumer 和 MQPushConsumer

    其中 MQPushConsumer 不就是我們的推模型嗎?其實這兩種模型都是客戶端主動去拉消息,其中的實現區別如下:

    MQPullConsumer:每次拉取消息需要傳入拉取消息的 offset 和每次拉取多少消息量,具體拉取哪里的消息,拉取多少是由客戶端控制。

    MQPushConsumer:同樣也是客戶端主動拉取消息,但是消息進度是由服務端保存,Consumer 會定時上報自己消費到哪里,所以 Consumer 下次消費的時候是可以找到上次消費的點,一般來說使用PushConsumer 我們不需要關心 offset 和拉取多少數據,直接使用即可。

    2.2 集群消費和廣播消費

    消費模式我們分為兩種,集群消費,廣播消費:

    集群消費: 同一個 GroupId 都屬于一個集群,一般來說一條消息只會被任意一個消費者處理。

    廣播消費:廣播消費的消息會被集群中所有消費者進行消息,但是要注意一下因為廣播消費的 offset 在服務端保存成本太高,所以客戶端每一次重啟都會從最新消息消費,而不是上次保存的 offset。

    3. RocketMQ 網絡模型

    在 Kafka 中使用的原生的 socket 實現網絡通信,而RocketMQ 使用的是 Netty 網絡框架,現在越來越多的中間件都不會直接選擇原生的 socket,而是使用的 Netty 框架,主要得益于下面幾個原因:

    3.1 API 使用簡單,不需要關心過多的網絡細節,更專注于中間件邏輯。

    3.2 性能高。成熟穩定,jdk nio 的 bug 都被修復了。

    選擇框架是一方面,而想要保證網絡通信的高效,網絡線程模型也是一方面,我們常見的有 1+N (1 個 Acceptor 線程,N 個 IO 線程),1+N+M (1個 acceptor 線程,N 個 IO 線程,M 個 worker線程)等模型,RocketMQ 使用的是 1+N1+N2+M 的模型,如下圖所示:

    1個 acceptor 線程,N1 個 IO 線程,N2 個線程用來做 Shake-hand, SSL 驗證, 編解碼; M 個線程用來做業務處理。這樣的好處將編解碼,和 SSL 驗證等一些可能耗時的操作放在了一個單獨的線程池,不會占據我們業務線程和 IO 線程。

    4. RocketMQ 存儲模型

    做為一個好的消息系統,高性能的存儲,高可用都不可少。
    RocketMQ 和 Kafka 的存儲核心設計有很大的不同,所以其在寫入性能方面也有很大的差別,這是16年阿里中間件團隊對 RocketMQ 和 Kafka 不同 Topic 下做的性能測試:

    從圖上可以看出:

    • Kafka 在 Topic 數量由 64 增長到 256 時,吞吐量下降了 98.37%。
    • RocketMQ 在 Topic 數量由 64 增長到 256 時,吞吐量只下降了 16%。

    這是為什么呢?kafka 一個 topic 下面的所有消息都是以 partition 的方式分布式的存儲在多個節點上。同時在 kafka 的機器上,每個 Partition 其實都會對應一個日志目錄,在目錄下面會對應多個日志分段。所以如果 Topic 很多的時候 Kafka 雖然寫文件是順序寫,但實際上文件過多,會造成磁盤 IO 競爭非常激烈。

    那 RocketMQ 為什么在多 Topic 的情況下,依然還能很好的保持較多的吞吐量呢?我們首先來看一下RocketMQ 中比較關鍵的文件:

    commitLog:消息主體以及元數據的存儲主體,存儲 Producer 端寫入的消息主體內容, 消息內容不是定長的。單個文件大小默認 1G ,文件名長度為 20 位,左邊補零,剩余為起始偏移量,比如00000000000000000000 代表了第一個文件,起始偏移量為 0,文件大小為 1G=1073741824;當第一個文件寫滿了,第二個文件為 00000000001073741824,起始偏移量為 1073741824,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件;

    config:保存一些配置信息,包括一些 Group,Topic 以及 Consumer 消費 offset 等信息。

    consumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能,由于 RocketMQ 是基于主題topic 的訂閱模式,消息消費是針對主題進行的,如果要遍歷 commitlog 文件中根據 topic 檢索消息是非常低效的。

    Consumer即可根據ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在 commitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。

    consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夾的組織方式如下:topic/queue/file 三層組織結構,具體存儲路徑為:HOME \store\index${fileName},文件名fileName 是以創建時的時間戳命名的,固定的單個 IndexFile 文件大小約為 400M,一個 IndexFile 可以保存 2000W 個索引,IndexFile 的底層存儲設計為在文件系統中實現 HashMap 結構,故 rocketmq 的索引文件其底層實現為 hash 索引。

    我們發現我們的消息主體數據并沒有像 Kafka 一樣寫入多個文件,而是寫入一個文件,這樣我們的寫入 IO 競爭就非常小,可以在很多 Topic 的時候依然保持很高的吞吐量。有同學說這里的 ConsumeQueue 寫是在不停的寫入呢,并且 ConsumeQueue 是以 Queue 維度來創建文件,那么文件數量依然很多,在這里 ConsumeQueue 的寫入的數據量很小,每條消息只有 20 個字節,30W 條數據也才 6M 左右,所以其實對我們的影響相對 Kafka 的 Topic 之間影響是要小很多的。我們整個的邏輯可以如下:

    Producer 不斷的再往 CommitLog 添加新的消息,有一個定時任務 ReputService 會不斷的掃描新添加進來的 CommitLog,然后不斷的去構建 ConsumerQueue 和 Index。


    注意:這里指的都是普通的硬盤,在 SSD 上面多個文件并發寫入和單個文件寫入影響不大。
    Kafka 中每個 Partition 都會是一個單獨的文件,所以當消費某個消息的時候,會很好的出現順序讀,我們知道 OS 從物理磁盤上訪問讀取文件的同時,會順序對其他相鄰塊的數據文件進行預讀取,將數據放入PageCache,所以 Kafka 的讀取消息性能比較好。

    RocketMQ讀取流程如下:

  • 先讀取 ConsumerQueue 中的 offset 對應 CommitLog 物理的 offset
  • 根據 offset 讀取 CommitLog
  • ConsumerQueue 也是每個 Queue 一個單獨的文件,并且其文件體積小,所以很容易利用 PageCache 提高性能。而 CommitLog,由于同一個 Queue 的連續消息在 CommitLog 其實是不連續的,所以會造成隨機讀, RocketMQ 對此做了幾個優化:

    • Mmap 映射讀取,Mmap 的方式減少了傳統IO將磁盤文件數據在操作系統內核地址空間的緩沖區和用戶應用程序地址空間的緩沖區之間來回進行拷貝的性能開銷
    • 使用 DeadLine 調度算法 + SSD 存儲盤

    由于 Mmap 映射受到內存限制,當不在 Mmmap 映射這部分數據的時候(也就是消息堆積過多),默認是內存的40%,會將請求發送到 SLAVE, 減緩 Master 的壓力。

    5、RocketMQ 高可用性

    集群模式
    我們首先需要選擇一種集群模式,來適應我們可忍耐的可用程度,一般來說分為三種:

    • 單 Master: 這種模式,可用性最低,但是成本也是最低,一旦宕機,所有都不可用。這種一般只適用于本地測試。
    • 單 Master 多 Slave: 這種模式,可用性一般,如果主宕機,那么所有寫入都不可用,讀取依然可用,如果 master 磁盤損壞,可以依賴 slave 的數據。
    • 多 Master: 這種模式,可用性一般,如果出現部分 master 宕機,那么這部分 master 上的消息都不可消費,也不可寫數據,如果一個 Topic 的隊列在多個 master 上都有,那么可以保證沒有宕機的那部分可以正常消費,寫入。如果 master 的磁盤損壞會導致消息丟失。
    • 多 Master 多 Slave:這種模式,可用性最高,但是維護成本也最高,當 master 宕機了之后,只會出現在這部分 master 上的隊列不可寫入,但是讀取依然是可以的,并且如果 master 磁盤損壞,可以依賴 slave 的數據。

    一般來說投入生產環境的話都會選擇第四種,來保證最高的可用性。

    消息的可用性
    當我們選擇好了集群模式之后,那么我們需要關心的就是怎么去存儲和復制這個數據,RocketMQ 對消息的刷盤提供了同步和異步的策略來滿足我們的,當我們選擇同步刷盤之后,如果刷盤超時會給返回 FLUSH_DISK_TIMEOUT,如果是異步刷盤不會返回刷盤相關信息,選擇同步刷盤可以盡最大程度滿足我們的消息不會丟失。
    除了存儲有選擇之后,我們的主從同步提供了同步和異步兩種模式來進行復制,當然選擇同步可以提升可用性,但是消息的發送 RT 時間會下降 10% 左右。

    Dleger-RocketMQ
    我們上面對于 master-slave 部署模式已經做了很多分析,我們發現,當 master 出現問題的時候,我們的寫入怎么都會不可用,除非恢復 master,或者手動將我們的 slave 切換成 master,導致了我們的 slave 在多數情況下只有讀取的作用。RocketMQ 在最近的幾個版本中推出了 Dleger-RocketMQ,使用 Raft 協議復制 CommitLog,并且自動進行選主,這樣 master 宕機的時候,寫入依然保持可用。

    6. RocketMQ 定時/延時消息

    定時消息和延時消息在實際業務場景中使用的比較多,比如下面的一些場景:

    • 訂單超時未支付自動關閉,因為在很多場景中下單之后庫存就被鎖定了,這里需要將其進行超時關閉。
    • 需要一些延時的操作,比如一些兜底的邏輯,當做完某個邏輯之后,可以發送延時消息比如延時半個小時,進行兜底檢查補償。
    • 在某個時間給用戶發送消息,同樣也可以使用延時消息。

    在開源版本的 RocketMQ 中延時消息并不支持任意時間的延時,需要設置幾個固定的延時等級,目前默認設置為:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,從 1s 到 2h 分別對應著等級 1到 18,而阿里云中的版本(收費)是可以支持40天內的任何時刻(毫秒級別)。我們先看下在RocketMQ中定時任務原理圖:

    Step1:Producer 在自己發送的消息上設置好需要延時的級別。
    Step2: Broker 發現此消息是延時消息,將 Topic 進行替換成延時 Topic,每個延時級別都會作為一個單獨的 queue,將自己的 Topic 作為額外信息存儲。
    Step3: 構建 ConsumerQueue
    Step4: 定時任務定時掃描每個延時級別的 ConsumerQueue。
    Step5: 拿到 ConsumerQueue 中的 CommitLog 的 Offset,獲取消息,判斷是否已經達到執行時間
    Step6: 如果達到,那么將消息的 Topic 恢復,進行重新投遞。如果沒有達到則延遲沒有達到的這段時間執行任務。
    可以看見延時消息是利用新建單獨的 Topic 和 Queue 來實現的,如果我們要實現 40 天之內的任意時間度,基于這種方案,那么需要 402460601000 個 queue,這樣的成本是非常之高的,那阿里云上面的支持任意時間是怎么實現的呢?這里猜測是持久化二級 TimeWheel 時間輪,二級時間輪用于替代我們的 ConsumeQueue,保存 Commitlog-Offset,然后通過時間輪不斷的取出當前已經到了的時間,然后再次投遞消息。

    7. RocketMQ 事務消息

    事務消息同樣的也是 RocketMQ 中的一大特色,其可以幫助我們完成分布式事務的最終一致性:

    加粗樣式
    具體使用事務消息步驟如下:
    Step1: 調用 sendMessageInTransaction 發送事務消息。
    Step2: 如果發送成功,則執行本地事務。
    Step3: 如果執行本地事務成功則發送 commit,如果失敗則發送 rollback。
    Step4: 如果其中某個階段比如 commit 發送失敗,rocketMQ 會進行定時從 Broker 回查,本地事務的狀態。
    事務消息的使用整個流程相對之前幾種消息使用比較復雜,下面是事務消息實現的原理圖:

    Step1: 發送事務消息,這里也叫做 halfMessage,會將 Topic 替換為 HalfMessage 的 Topic。
    Step2: 發送 commit 或者 rollback,如果是 commit 這里會查詢出之前的消息,然后將消息復原成原 Topic,并且發送一個 OpMessage 用于記錄當前消息可以刪除。如果是 rollback 這里會直接發送一個 OpMessage 刪除。
    Step3: 在 Broker 有個處理事務消息的定時任務,定時對比 halfMessage 和 OpMessage, 如果有OpMessage 且狀態為刪除,那么該條消息必定 commit 或者 rollback,所以就可以刪除這條消息。
    Step4: 如果事務超時(默認是6s),還沒有 opMessage,那么很有可能 commit 信息丟了,這里會去反查我們的 Producer 本地事務狀態。
    Step5: 根據查詢出來的信息做 Step2。
    我們發現 RocketMQ 實現事務消息也是通過修改原 Topic 信息,和延遲消息一樣,然后模擬成消費者進行消費,做一些特殊的業務邏輯。當然我們還可以利用這種方式去做 RocketMQ 更多的擴展。

    8、參考鏈接

    [01] Java工程師的進階之路 -【 RocketMQ (二)】

    總結

    以上是生活随笔為你收集整理的525、Java工程师的进阶之路 -【 RocketMQ (二)】 2022.01.06的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。