日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

阿里二面:RocketMQ 消息积压了,增加消费者有用吗?

發(fā)布時(shí)間:2025/3/15 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 阿里二面:RocketMQ 消息积压了,增加消费者有用吗? 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

面試官:RocketMQ 消息積壓了,增加消費(fèi)者有用嗎?

:這個(gè)要看具體的場(chǎng)景,不同的場(chǎng)景下情況是不一樣的。

面試官:可以詳細(xì)說一下嗎?

:如果消費(fèi)者的數(shù)量小于 MessageQueue 的數(shù)量,增加消費(fèi)者可以加快消息消費(fèi)速度,減少消息積壓。比如一個(gè) Topic 有 4 個(gè) MessageQueue,2 個(gè)消費(fèi)者進(jìn)行消費(fèi),如果增加一個(gè)消費(fèi)者,明細(xì)可以加快拉取消息的頻率。如下圖:

如果消費(fèi)者的數(shù)量大于等于 MessageQueue 的數(shù)量,增加消費(fèi)者是沒有用的。比如一個(gè) Topic 有 4 個(gè) MessageQueue,并且有 4 個(gè)消費(fèi)者進(jìn)行消費(fèi)。如下圖

面試官:你說的第一種情況,增加消費(fèi)者一定能加快消息消費(fèi)的速度嗎?

:這...,一般情況下是可以的。

面試官:有特殊的情況嗎?

:當(dāng)然有。消費(fèi)者消息拉取的速度也取決于本地消息的消費(fèi)速度,如果本地消息消費(fèi)的慢,就會(huì)延遲一段時(shí)間后再去拉取。

面試官:在什么情況下消費(fèi)者會(huì)延遲一段時(shí)間后后再去拉取呢?

:消費(fèi)者拉取的消息存在 ProcessQueue,消費(fèi)者是有流量控制的,如果出現(xiàn)下面三種情況,就不會(huì)主動(dòng)去拉取:

  • ProcessQueue 保存的消息數(shù)量超過閾值(默認(rèn) 1000,可以配置);

  • ProcessQueue 保存的消息大小超過閾值(默認(rèn) 100M,可以配置);

  • 對(duì)于非順序消費(fèi)的場(chǎng)景,ProcessQueue 中保存的最后一條和第一條消息偏移量之差超過閾值(默認(rèn) 2000,可以配置)。

這部分源碼請(qǐng)參考類:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl。

面試官:還有其他情況嗎?

:對(duì)于順序消費(fèi)的場(chǎng)景,ProcessQueue 加鎖失敗,也會(huì)延遲拉取,這個(gè)延遲時(shí)間是 3s。

面試官:消費(fèi)者延遲拉取消息,一般可能是什么原因?qū)е碌哪?#xff1f;

:其實(shí)延遲拉取的本質(zhì)就是消費(fèi)者消費(fèi)慢,導(dǎo)致下次去拉取的時(shí)候 ProcessQueue 中積壓的消息超過閾值。以下面這張架構(gòu)圖為例:

消費(fèi)者消費(fèi)慢,可是能下面的原因:

  • 消費(fèi)者處理的業(yè)務(wù)邏輯復(fù)雜,耗時(shí)很長(zhǎng);

  • 消費(fèi)者有慢查詢,或者數(shù)據(jù)庫(kù)負(fù)載高導(dǎo)致響應(yīng)慢;

  • 緩存等中間件響應(yīng)慢,比如 Redis 響應(yīng)慢;

  • 調(diào)用外部服務(wù)接口響應(yīng)慢。

面試官:對(duì)于外部接口響應(yīng)慢的情況,有什么應(yīng)對(duì)措施嗎?

:這個(gè)要分情況討論。

如果調(diào)用外部系統(tǒng)只是一個(gè)通知,或者調(diào)用外部接口的結(jié)果并不處理,可以采用異步的方式,異步邏輯里采用重試的方式保證接口調(diào)成功。

如果外部接口返回結(jié)果必須要處理,可以考慮接口返回的結(jié)果是否可以緩存默認(rèn)值(要考慮業(yè)務(wù)可行),在調(diào)用失敗后采用快速降級(jí)的方式,使用默認(rèn)值替代返回接口返回值。

如果這個(gè)接口返回結(jié)果必須要處理,并且不能緩存,可以把拉取到的消息存入本地然后給 Broker 直接返回 CONSUME_SUCCESS。等外部系統(tǒng)恢復(fù)正常后再?gòu)谋镜厝〕鰜磉M(jìn)行處理。

面試官:如果消費(fèi)者數(shù)小于 MessageQueue 數(shù)量,并且外部系統(tǒng)響應(yīng)正常,為了快速消費(fèi)積壓消息而增加消費(fèi)者,有什么需要考慮的嗎?

:外部系統(tǒng)雖然響應(yīng)正常,但是增加多個(gè)消費(fèi)者后,外部系統(tǒng)的接口調(diào)用量會(huì)突增,如果達(dá)到吞吐量上限,外部系統(tǒng)會(huì)響應(yīng)變慢,甚至被打掛。

同時(shí)也要考慮本地?cái)?shù)據(jù)庫(kù)、緩存的壓力,如果數(shù)據(jù)庫(kù)響應(yīng)變慢,處理消息的速度就會(huì)變慢,起不到緩解消息積壓的作用。

面試官:新增加了消費(fèi)者后,怎么給它分配 MessageQueue 呢?

:Consumer 在拉取消息之前,需要對(duì) MessageQueue 進(jìn)行負(fù)載操作。RocketMQ 使用一個(gè)定時(shí)器來完成負(fù)載操作,默認(rèn)每間隔 20s 重新負(fù)載一次。

面試官:能詳細(xì)說一下都有哪些負(fù)載策略嗎?

:RocketMQ 提供了 6 種負(fù)載策略,依次來看一下。

平均負(fù)載策略

  • 把消費(fèi)者進(jìn)行排序;

  • 計(jì)算每個(gè)消費(fèi)者可以平均分配的 MessageQueue 數(shù)量;

  • 如果消費(fèi)者數(shù)量大于 MessageQueue 數(shù)量,多出的消費(fèi)者就分不到;

  • 如果不可以平分,就使用 MessageQueue 總數(shù)量對(duì)消費(fèi)者數(shù)量求余數(shù) mod;

  • 對(duì)前 mod 數(shù)量消費(fèi)者,每個(gè)消費(fèi)者加一個(gè),這樣就獲取到了每個(gè)消費(fèi)者分配的 MessageQueue 數(shù)量。

  • 比如 4 個(gè) MessageQueue 和 3 個(gè)消費(fèi)者的情況:

    源代碼的邏輯非常簡(jiǎn)單,如下:

    //?AllocateMessageQueueAveragely?這個(gè)類 //?4?個(gè)?MessageQueue?和?3?個(gè)消費(fèi)者的情況,假如第一個(gè),index?=?0 int?index?=?cidAll.indexOf(currentCID); //?mod?=?1 int?mod?=?mqAll.size()?%?cidAll.size(); //?averageSize?=?2 int?averageSize?=mqAll.size()?<=?cidAll.size()???1?:?(mod?>?0?&&?index?<?mod???mqAll.size()?/?cidAll.size()+?1?:?mqAll.size()?/?cidAll.size()); //?startIndex?=?0 int?startIndex?=?(mod?>?0?&&?index?<?mod)???index?*?averageSize?:?index?*?averageSize?+?mod; //?range?=?2,所以第一個(gè)消費(fèi)者分配到了2個(gè) int?range?=?Math.min(averageSize,?mqAll.size()?-?startIndex); for?(int?i?=?0;?i?<?range;?i++)?{result.add(mqAll.get((startIndex?+?i)?%?mqAll.size())); }

    循環(huán)分配策略:

    這個(gè)很容易理解,遍歷消費(fèi)者,把 MessageQueue 分一個(gè)給遍歷到的消費(fèi)者,如果 MessageQueue 數(shù)量比消費(fèi)者多,需要進(jìn)行多次遍歷,遍歷次數(shù)等于 (MessageQueue 數(shù)量/消費(fèi)者數(shù)量),還是以 4 個(gè) MessageQueue 和 3 個(gè)消費(fèi)者的情況,如下圖:

    源代碼如下:

    //AllocateMessageQueueAveragelyByCircle?這個(gè)類 //4?個(gè)?MessageQueue?和?3?個(gè)消費(fèi)者的情況,假如第一個(gè),index?=?0 int?index?=?cidAll.indexOf(currentCID); for?(int?i?=?index;?i?<?mqAll.size();?i++)?{if?(i?%?cidAll.size()?==?index)?{//i?==?0?或者?i?==?3?都會(huì)走到這里result.add(mqAll.get(i));} }

    自定義分配策略

    這種策略在消費(fèi)者啟動(dòng)的時(shí)候可以指定消費(fèi)哪些 MessageQueue。可以參考下面代碼:

    AllocateMessageQueueByConfig?allocateMessageQueueByConfig?=?new?AllocateMessageQueueByConfig(); //綁定消費(fèi)?messageQueue1 allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new?MessageQueue("messageQueue1","broker1",0))); consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig); consumer.start();

    按照機(jī)房分配策略

    這種方式 Consumer 只消費(fèi)指定機(jī)房的 MessageQueue,如下圖:Consumer0、Consumer1、Consumer2 綁定 room1 和 room2 這兩個(gè)機(jī)房,而 room3 這個(gè)機(jī)房沒有消費(fèi)者。

    Consumer 啟動(dòng)的時(shí)候需要綁定機(jī)房名稱。可以參考下面代碼:

    AllocateMessageQueueByMachineRoom?allocateMessageQueueByMachineRoom?=?new?AllocateMessageQueueByMachineRoom(); //綁定消費(fèi)?room1?和?room2?這兩個(gè)機(jī)房 allocateMessageQueueByMachineRoom.setConsumeridcs(new?HashSet<>(Arrays.asList("room1","room2"))); consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom); consumer.start();

    這種策略 broker 的命名必須按照格式:機(jī)房名@brokerName,因?yàn)橄M(fèi)者分配隊(duì)列的時(shí)候,首先按照機(jī)房名稱過濾出所有的 MessageQueue,然后再按照平均分配策略進(jìn)行分配

    //AllocateMessageQueueByMachineRoom?這個(gè)類 List<MessageQueue>?premqAll?=?new?ArrayList<MessageQueue>(); for?(MessageQueue?mq?:?mqAll)?{String[]?temp?=?mq.getBrokerName().split("@");if?(temp.length?==?2?&&?consumeridcs.contains(temp[0]))?{premqAll.add(mq);} } //上面按照機(jī)房名稱過濾出所有的?MessageQueue?放入premqAll,后面就是平均分配策略

    按照機(jī)房就近分配

    跟按照機(jī)房分配原則相比,就近分配的好處是可以對(duì)沒有消費(fèi)者的機(jī)房進(jìn)行分配。如下圖,機(jī)房 3 的 MessageQueue 也分配到了消費(fèi)者:

    如果一個(gè)機(jī)房沒有消費(fèi)者,則會(huì)把這個(gè)機(jī)房的 MessageQueue 分配給集群中所有的消費(fèi)者。

    源碼所在類:AllocateMachineRoomNearby。

    一致性 Hash 算法策略

    把所有的消費(fèi)者經(jīng)過 Hash 計(jì)算分布到 Hash 環(huán)上,對(duì)所有的 MessageQueue ?進(jìn)行 Hash ?計(jì)算,找到順時(shí)針方向最近的消費(fèi)者節(jié)點(diǎn)進(jìn)行綁定。如下圖:


    源代碼如下:

    //所在類?AllocateMessageQueueConsistentHash Collection<ClientNode>?cidNodes?=?new?ArrayList<ClientNode>(); for?(String?cid?:?cidAll)?{cidNodes.add(new?ClientNode(cid)); } //使用消費(fèi)者構(gòu)建?Hash?環(huán),把消費(fèi)者分布在?Hash?環(huán)節(jié)點(diǎn)上 final?ConsistentHashRouter<ClientNode>?router;?//for?building?hash?ring if?(customHashFunction?!=?null)?{router?=?new?ConsistentHashRouter<ClientNode>(cidNodes,?virtualNodeCnt,?customHashFunction); }?else?{router?=?new?ConsistentHashRouter<ClientNode>(cidNodes,?virtualNodeCnt); } //對(duì)?MessageQueue?做?Hash?運(yùn)算,找到環(huán)上距離最近的消費(fèi)者 List<MessageQueue>?results?=?new?ArrayList<MessageQueue>(); for?(MessageQueue?mq?:?mqAll)?{ClientNode?clientNode?=?router.routeNode(mq.toString());if?(clientNode?!=?null?&&?currentCID.equals(clientNode.getKey()))?{results.add(mq);} }

    面試官:恭喜你,通過了。

    有道無(wú)術(shù),術(shù)可成;有術(shù)無(wú)道,止于術(shù)

    歡迎大家關(guān)注Java之道公眾號(hào)

    好文章,我在看??

    總結(jié)

    以上是生活随笔為你收集整理的阿里二面:RocketMQ 消息积压了,增加消费者有用吗?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。