日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ 消费者(1)概念和消费流程

發布時間:2024/5/14 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ 消费者(1)概念和消费流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 背景

RocketMQ 的消費可以算是 RocketMQ 的業務邏輯中最復雜的一塊。這里面涉及到許多消費模式和特性。本想一篇文章寫完,寫到后面發現消費涉及到的內容太多,于是決定分多篇來寫。本文作為消費系列的第一篇,主要講述 RocketMQ 消費涉及到的模式和特性,也會概括性地講一下消費流程。

我將 RocketMQ 的消費流程大致分成 4 個步驟

  • 重平衡
  • 消費者拉取消息
  • Broker 接收拉取請求后從存儲中查詢消息并返回
  • 消費者消費消息
  • 每個步驟都會用一篇文章來講解。

    先了解一下 RocketMQ 消費涉及到地概念

    2. 概念簡述

    2.1 消費組概念與消費模式

    和大多數消息隊列一樣,RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。在了解它們之前,需要先引入消費組的概念。

    2.1.1 消費組

    一個消費者實例即是一個消費者進程,負責消費消息。單個消費者速度有限,在實際使用中通常會采用多個消費者共同消費同樣的 Topic 以加快消費速度。這多個消費同樣 Topic 的消費者組成了消費者組。

    消費組是一個邏輯概念,它包含了多個同一類的消費者實例,通常這些消費者都消費同一類消息(都消費相同的 Topic)且消費邏輯一致。

    消費組的引入是用來在消費消息時更好地進行負載均衡和容錯。

    2.1.2 廣播消費模式(BROADCASTING)

    廣播消費模式即全部的消息會廣播分發到所有的消費者實例,每個消費者實例會收到全量的消息(即便消費組中有多個消費者都訂閱同一 Topic)。

    如下圖所示,生產者發送了 5 條消息,每個消費組中的消費者都收到全部的 5 條消息。

    廣播模式使用較少,適合各個消費者都需要通知的場景,如刷新應用中的緩存。

    注意事項:

  • 廣播消費模式下不支持 順序消息
  • 廣播消費模式下不支持 重置消費位點。
  • 每條消息都需要被相同訂閱邏輯的多臺機器處理。
  • 消費進度在客戶端維護,出現重復消費的概率稍大于集群模式。如果消費進度文件丟失,存在消息丟失的可能。
  • 廣播模式下,消息隊列 RocketMQ 版保證每條消息至少被每臺客戶端消費一次,但是并不會重投消費失敗的消息,因此業務方需要關注消費失敗的情況。
  • 廣播模式下,客戶端每一次重啟都會從最新消息消費??蛻舳嗽诒煌V蛊陂g發送至服務端的消息將會被自動跳過,請謹慎選擇。
  • 廣播模式下,每條消息都會被大量的客戶端重復處理,因此推薦盡可能使用集群模式。
  • 廣播模式下服務端不維護消費進度,所以消息隊列 RocketMQ 版控制臺不支持消息堆積查詢、消息堆積報警和訂閱關系查詢功能。
  • 2.1.3 集群消費模式(CLUSTERING)

    集群消費模式下,同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者實例上。

    更具體一點,在同一消費組中的不同消費者會根據負載機制來平均地訂閱 Topic 中的每個 Queue。(默認 AVG 負載方式)

    RocketMQ 默認使用集群消費模式,這也是大部分場景下會使用到的消費模式。

    2.2 消費者拉取消息模式

    2.2.1 Pull

    指消費者主動拉取消息進行消費,主動從 Broker 拉取消息,主動權由消費者應用控制。

    2.2.2 Push

    Broker 主動將消息 Push 給消費者,Broker 收到消息就會主動推送到消費者端。該模式的消費實時性較高,也是主流場景中普遍采用的消費形式。

    消費者組中的消費者實例會根據預設的負載均衡算法對 Topic 中的 Queue 進行均勻的訂閱,每個 Queue 最多只能被一個消費者訂閱。

    在 RocketMQ 中,Push 消費其實也是由 Pull 消費(拉取)實現。Push 消費只是通過客戶端 API 層面的封裝讓用戶感覺像是 Broker 在推送消息給消費者。

    2.2.3 POP

    RocketMQ 5.0 引入的新消費形式,是 Pull 拉取的另一種實現。也可以在 Push 模式下使用 POP 拉取消息,甚至可以和 Push 模式共同使用(分別消費重試 Topic 和普通 Topic)。

    POP 與 Pull 可以通過一個開關實時進行切換。POP 模式下,Broker 來控制每個消費者消費的隊列和拉取的消息,把重平衡邏輯從客戶端移到了服務端。

    主要解決了原來 Push 模式消費的以下痛點:

    • 富客戶端:客戶端邏輯比較重,多語言支持不友好
    • 隊列獨占:Topic 中的一個 Queue 最多只能被 1 個 Push 消費者消費,消費者數量無法無限擴展。且消費者 hang 住時該隊列的消息會堆積。
    • 消費后更新 offset:本地消費成功才會提交 offset

    RocketMQ 5.0 的輕量化 gRPC 客戶端就是基于 POP 消費模式開發

    2.3 隊列負載機制與重平衡

    在集群消費模式下,消費組中的消費者共同消費訂閱的 Topic 中的所有消息,這里就存在 Topic 中的隊列如何分配給消費者的問題。

    2.3.1 隊列負載機制

    RocketMQ Broker 中的隊列負載機制將一個 Topic 的不同隊列按照算法盡可能平均地分配給消費者組中的所有消費者。RocketMQ 預設了多種負載算法供不同場景下的消費。

    AVG:將隊列按數量平均分配給多個消費者,按 Broker 順序先分配第一個 Broker 的所有隊列給第一個消費者,然后給第二個。

    AVG_BY_CIRCLE:將 Broker 上的隊列輪流分給不同消費者,更適用于 Topic 在不同 Broker 之間分布不均勻的情況。

    默認采用 AVG 負載方式。

    2.3.2 重平衡(Rebalance)

    為消費者分配隊列消費的這一個負載過程并不是一勞永逸的,比如當消費者數量變化、Broker 掉線等情況發生后,原先的負載就變得不再均衡,此時就需要重新進行負載均衡,這一過程被稱為重平衡機制。

    每隔 20s,RocketMQ 會進行一次檢查,檢查隊列數量、消費者數量是否發生變化,如果變化則觸發消費隊列重平衡,重新執行上述負載算法。

    2.4 消費端高可靠

    2.4.1 重試-死信機制

    在實際使用中,消息的消費可能出現失敗。RocketMQ 擁有重試機制和死信機制來保證消息消費的可靠性。

  • 正常消費:消費成功則提交消費位點

  • 重試機制:如果正常消費失敗,消息會被消費者發回 Broker,放入重試 Topic: %RETRY%消費者組。最多重試消費 16 次,重試的時間間隔逐漸變長。(消費者組會自動訂閱重試 Topic)。

    這里地延遲重試采用了 RocketMQ 的延遲消息,重試的 16 次時間間隔為延遲消息配置的每個延遲等級的時間(從第三個等級開始)。如果修改延遲等級時間的配置,重試的時間間隔也會相應發生變化。但即便延遲等級時間間隔配置不足 16 個,仍會重試 16 次,后面按照最大的時間間隔來重試。

  • 死信機制:如果正常消費和重試 16 次均失敗,消息會保存到死信 Topic %DLQ%消費者組 中,此時需人工介入處理

  • 2.4.2 隊列負載機制與重平衡

    當發生 Broker 掛掉或者消費者掛掉時,會引發重平衡,可以自動感知有組件掛掉的情況并重新調整消費者的訂閱關系。

    2.5 并發消費與順序消費

    在消費者客戶端消費時,有兩種訂閱消息的方式,分別是并發消費和順序消費。廣播模式不支持順序消費,僅有集群模式能使用順序消費。

    需要注意的是,這里所說的順序消費指的是隊列維度的順序,即在消費一個隊列時,消費消息的順序和消息發送的順序一致。如果一個 Topic 有多個隊列, 是不可能達成 Topic 級別的順序消費的,因為無法控制哪個隊列的消息被先消費。Topic 只有一個隊列的情況下能夠實現 Topic 級別的順序消費。

    具體順序生產和消費代碼見 官方文檔。

    順序生產的方式為串行生產,并在生產時指定隊列。

    并發消費的方式是調用消費者的指定 MessageListenerConcurrently 作為消費的回調類,順序消費則使用 MessageListenerOrderly 類進行回調。處理這兩種消費方式的消費服務也不同,分別是 ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService。

    順序消費的大致原理是依靠兩組鎖,一組在 Broker 端(Broker 鎖),鎖定隊列和消費者的關系,保證同一時間只有一個消費者在消費;在消費者端也有一組鎖(消費隊列鎖)以保證消費的順序性。

    2.6 消費進度保存和提交

    消費者消費一批消息完成之后,需要保存消費進度。如果是集群消費模式,還需要將消費進度讓其他消費者知道,所以需要提交消費進度。這樣在消費者重啟或隊列重平衡時可以根據消費進度繼續消費。

    不同模式下消費進度保存方式的不同:

  • 廣播模式:保存在消費者本地。因為每個消費者都需要消費全量消息消息。在 LocalfileOffsetStore 當中。
  • 集群模式:保存在 Broker,同時消費者端緩存。因為一個 Topic 的消息只要被消費者組中的一個消費者消費即可,所以消息的消費進度需要統一保存。通過 RemoteBrokerOffsetStore 存儲。
  • 集群模式下,消費者端有定時任務,定時將內存中的消費進度提交到 Broker,Broker 也有定時任務將內存中的消費偏移量持久化到磁盤。此外,消費者向 Broker 拉取消息時也會提交消費偏移量。注意,消費者線程池提交的偏移量是線程池消費的這一批消息中偏移量最小的消息的偏移量。

  • 消費完一批消息后將消息消費進度存在本地內存
  • 消費者中有一個定時線程,每 5s 將內存中所有隊列的消費偏移量提交到 Broker
  • Broker 收到消費進度先緩存到內存,有一個定時任務每隔 5s 將消息偏移量持久化到磁盤
  • 消費者向 Broker 拉取消息時也會將隊列的消息偏移量提交到 Broker
  • 3. 消費流程

    這張圖是阿里云的文章講解消費時用到的,能夠清晰地表示客戶端 Push 模式并發消費流程。

    從左上角第一個方框開始看

  • 消費者啟動時喚醒重平衡服務 RebalanceService,重平衡服務是客戶端開始消費的起點。
  • 重平衡服務會周期性(每 20s)執行重平衡方法 doRebalance),查詢所有注冊的 Broker,根據注冊的 Broker 數量為自身分配負載的隊列 rebalanceByTopic()
  • 分配完隊列后,會為每個分配到的新隊列創建一個消息拉取請求 pullRequest,這個拉取請求中保存一個處理隊列 processQueue,即圖中的紅黑樹(TreeMap),用來保存拉取到的消息。紅黑樹保存消息的順序。
  • 消息拉取線程應用生產-消費模式,用一個線程從拉取請求隊列 pullRequestQueue 中彈出拉取請求,執行拉取任務,將拉取到的消息放入處理隊列。
  • 拉取請求在一次拉取消息完成之后會復用,重新被放入拉取請求隊列 pullRequestQueue 中
  • 拉取完成后,在 NettyClientPublicExecutorThreadPool 線程池異步處理結果,將拉取到的消息放入處理隊列,然后調用 consumeMessageService.submitConsumeRequest,將處理隊列和 多個消費任務提交到消費線程池。每個消費任務消費 1 批消息(1 批默認為 1 條)
  • 每個消費者都有一個消費線程池 consumeMessageThreadPool ,默認有 20 個消費線程。
  • 消費線程池的每個消費線程會嘗試從消費任務隊列中獲取消費請求,執行消費業務邏輯 listener.consumeMessage。
  • 消費完成后,如果消費成功,則更新偏移量 updateOffset(先更新到內存 offsetTable,定時上報到 Broker。Broker 端也先放到內存,定時刷盤)。
  • 參考資料

    • 官方文檔——設計
    • [RocketMQ 實戰與進階——丁威](http://learn.lianglianglee.com/專欄/RocketMQ 實戰與進階(完)/08 消息消費 API 與版本變遷說明.md)
    • RocketMQ消費消息——白云鵬
    • 消息中間件—RocketMQ消息消費(一)——癲狂俠
    • RocketMQ 消息接受流程——趙坤
    • RocketMQ 消息消費——貝貝貓
    • RocketMQ 5.0 POP 消費模式探秘
    • RocketMQ消息消費源碼分析
    • Rocketmq消費消息原理——服務端技術棧
    • RocketMQ——4. Consumer 消費消息——Kong

      本文由博客一文多發平臺 OpenWrite 發布!

    總結

    以上是生活随笔為你收集整理的RocketMQ 消费者(1)概念和消费流程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。