日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

消息中间件核心实体(1)

發布時間:2023/12/20 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 消息中间件核心实体(1) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

接上一篇《消息中間件核心實體(0)》,這一篇繼續介紹消息中間件中的一些實體。

上一篇主要是Message、Topic、TopicMeta和Queue這樣最基礎的實體,這幾篇介紹一些發送和消費的過程中會涉及到的實體和組件。

1. 發送

1.1 增強Message屬性

Message一般只包含topic、tag、content這些屬性,這些屬性也是使用方在發送時會涉及到的內容。但是光有這些屬性往往是不夠的,比如我們會需要記錄產生這條消息的Producer的信息;記錄消息的產生時間和產生的IP信息等等。這些信息都是在Client中給消息附加上去的,對發送方來說是透明的,所以不會在Message實體中暴露,而是我們會增加一個實體:EnhancedMessage。

EnhancedMessage繼承自Message,并會增加一些如下的屬性:

  • bornTime

  • bornAddress

  • producer

  • etc

引申一點,Producer發送消息的大致過程如下:

  • 增強Message屬性,得到EnhancedMessage的實例

  • 獲取可以寫入的隊列(也可以理解成獲取分區)

  • 向隊列寫入消息(可以是隊列暴露寫入接口或者由專門的寫入工具寫入到隊列中)

  • 偽代碼:

    EnhancedMessage msg = enhance(message); // 根據消息選擇一個可以寫入的目標隊列 WritableQueue queue = router.select(msg); // 寫入消息(queue實現write方法進行寫入) Result result = queue.write(msg);// write過程 // 將消息序列化成自定義協議的網絡包 Packet messagePacket = Serializer.encode(msg); // 發送網絡包 bootstrap.write(messagePacket);

    上面的WritableQueue暴露了API去寫入,具體實現可以是寫入到網絡,即遠端的一個Partition。而在做單元測試或者本地測試的時候,可以覆蓋write的實現,而不用真正寫入到網絡中,這會使代碼更容易測試測試。

    上面兩幅圖是Rocket開源版本中發送相關的一些代碼,私以為這段代碼非常的不優雅,讀起來特別累,特別是requestHeader的各種屬性設置。

    這段是Rocket開源版本中真正將消息寫入到網絡的實現,看起來總是非常臃腫,另外不知道是如何mock這些實現以達到在本地做測試的目的的。

    1.2 Queue的路由選擇

    發送過程中會涉及到隊列的選擇(分區的選擇),一條消息最終會根據一定的策略落到一個分區中,這里需要一個組件來完成選擇(把這個組件單獨抽象出來,這樣便于控制寫入的目標來進行測試,抽象出來也可以由使用方來實現,這樣可以按照使用方自己的場景做特定的路由)。

    路由組件非常的簡單,一般是Router會根據topic獲取到topic的元數據(元數據包含了多有分區的信息),然后根據消息的屬性或者用戶的參數計算出落到哪個分區,比如可以根據用戶的參數對分區總數取模來選擇分區,這樣可以做到將某一類消息發送到一個分區,比如同一個用戶的消息或同一筆訂單的不同消息。

    這個組件會比較簡單,但是在集成的時候需要注意一點,這個組件用戶可以自己注入到Producer中來達到控制分區選擇策略的目的。

    RocketMQ在TopicPublishInfo中實現分區的選擇,TopicPublishInfo包含了隊列信息(List<MessageQueue> messageQueueList屬性),筆者更傾向于抽象出獨立的路由組件,以便在特定的場景用戶可以自己實現路由,或者在測試時可以做到使用特定路由規則。

    2. 消費

    消費可以分為多種方式,從獲取消息的方式上可以分為Pull和Push兩種類型的Consumer;從消費消息的方式上可以分為集群消費和廣播消費。這里不展開討論各種模式的實現(以后單獨會討論Consumer該實現那些內容),會以Push模式&集群消費的Consumer為例,把消費流程中涉及到的一些組件進行介紹。

    2.1 分配分區

    集群消費中需要保證每個分區有且只有一個Consumer在進行消費。如果某個分區沒有Consumer消費,那么使用方拿不到完整的數據;如果某個分區被兩個Consumer消費,那么會產生大量的重復消息。所以這里需要實現一個分區分配策略,使在分布式環境中,每個Consumer拿到屬于自己的分區,且相互交叉。下面是四個分區兩個Consumer默認情況下的分配結果。

    實現的策略一般是:

  • 拿到一個Topic所有的分區,對這個列表進行排序

  • 拿到當前所有的Consumer,對Consumer列表進行排序

  • 根據自己所處的Consumer列表的位置和Consumer總數,從分區列表中獲取對應的一部分

  • 每個分區和Consumer都有唯一的ID,這樣各自按照排序后的結果進行分配,可以達到相互不交叉且不遺漏的目的。(在Consumer總數或分區數發生變化的過程中可能分配結果不正確,這個過程是短暫的,且在消費時還會結合鎖去保證分區只有一個Consumer消費,所以不會對實際消費產生影響)。

    同樣記住一點,這個分配策略是需要暴露出去的,系統可以默認實現集群消費和廣播消費的基礎策略,用戶可以實現自己的分配策略注入到系統中。

    2.2 消息緩存

    消費端一個重要的組件是消息緩存。為了提升性能,在消費端消息的獲取和消息的消費是異步的。Consumer內部有線程專門從服務端獲取消息寫入到消息緩存中,另外有線程從緩存中獲取消息調用用戶的回調接口來執行業務操作。

    消息緩存除了提供基礎的put和take來實現存入消息和取出消息,還需要自身容量,水位控制等配置。

    本身Buffer不是很復雜的部分,但是需要考慮一些流控策略,比如Buffer使用率到多少時降低從服務端獲取數據的頻率。

    RocketMQ中實現消息緩存由ProcessQueue實現,筆者傾向于獨立出Buffer模塊,另外Buffer需要提供鎖,以實現順序消費。

    2.3 消費進度

    還有一個重要的實體是消費進度,系統需要記錄“每個”Consumer的消費進度,且這個數據需要被持久化。

    消費進度需要記錄某個Group對某個Topic的某個分區的消費位點。進度是按照Topic維度去組織的(持久化在服務端),結構如下:

    topicgroup0cursor0、cursor1、cursor2...group1...實現的對象應該是: class Cursors {String topic;Cursor cursor;class Cursor {String group;// 用數組來存儲一個group消費的一個topic的所有分區的進度// 分區數一般情況下不會變更(變更場景很少),用數據就可以long[] cursors;} }

    Consumer可以在每一次獲取消息時將消費進度提交到服務端,在服務端來更新Cursors內部的數據。

    3. 結語

    最近兩篇內容將一些基礎實體和組件簡單的介紹了一下,下一篇討論一下消息應該由Server Push給Consumer還是Consumer主動來Pull消息。

    往期文章:

    消息中間件核心實體(0)

    消息的寫入和讀取流程

    NameServer模塊劃分

    Client模塊劃分

    Broker模塊劃分

    消息中間件架構討論

    業務方對消息中間件的需求

    消息中間件中的一些概念

    什么是分布式消息中間件?

    歡迎關注公眾號來交流MQ相關問題。

    ?

    轉載于:https://www.cnblogs.com/hzmark/p/mq_entity_1.html

    總結

    以上是生活随笔為你收集整理的消息中间件核心实体(1)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。