RocketMQ的负载均衡
在了解了RocketMQ的發(fā)送與接收后,也好奇RocketMQ內(nèi)部是如何處理好生產(chǎn)端、消費(fèi)端的負(fù)載均衡的,下面通過(guò)分析源碼、查閱相關(guān)文檔資料以及結(jié)合自己的理解,做了下歸納總結(jié)。
RocketMQ的消息負(fù)載均衡都是下放到Client端來(lái)實(shí)現(xiàn)的,具體可細(xì)分為2塊:發(fā)送負(fù)載(Producer端)、消費(fèi)負(fù)載(Consumer端)。
?
1、發(fā)送負(fù)載
1.1 路由信息
消息生產(chǎn)者Producer作為客戶端發(fā)送消息時(shí)候,需要根據(jù)消息的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。如果沒(méi)有則更新路由信息會(huì)從NameServer上重新拉取,同時(shí)Producer會(huì)默認(rèn)每隔30s向NameServer拉取一次路由信息。
1.2 選擇隊(duì)列
1.2.1 默認(rèn)方式(sendLatencyFaultEnable 開(kāi)關(guān)關(guān)閉)
生產(chǎn)者端發(fā)送消息時(shí),會(huì)根據(jù)Topic信息(每條消息都必須指定有Topic信息),從TopicPublishInfo中的messageQueueList中選擇一個(gè)隊(duì)列(MessageQueue)進(jìn)行發(fā)送消息。隨機(jī)遞增式的輪詢,每個(gè)生產(chǎn)者都通過(guò)ThreadLocal維護(hù)自己的一套下標(biāo)index,初始化時(shí)產(chǎn)生隨機(jī)數(shù)生成下標(biāo),后續(xù)每次都遞增加1后對(duì)隊(duì)列個(gè)數(shù)取模,從而獲取對(duì)應(yīng)下標(biāo)的messageQueue。
1.2.2 Broker故障延遲方式(sendLatencyFaultEnable 開(kāi)關(guān)打開(kāi))
在隨機(jī)遞增取模的基礎(chǔ)上,結(jié)合消息失敗延遲策略,過(guò)濾掉暫時(shí)認(rèn)為不可用的Broker的消息隊(duì)列。
消息失敗延遲策略的算法在MQFaultStrategy上實(shí)現(xiàn)(MQFaultStrategy也被稱為失敗延遲策略實(shí)現(xiàn)的門面類),其中2個(gè)重要的參數(shù) latencyMax、notAvailableDuration(單位都是毫秒)。
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};這2個(gè)參數(shù)如何結(jié)合實(shí)現(xiàn)延遲的呢?
latencyMax,在發(fā)送消息后,根據(jù)本次消息的發(fā)送耗時(shí) currentLatency,從latencyMax數(shù)組最后一個(gè)值往前找,直到第一個(gè)比currentLatency小的值,其對(duì)應(yīng)的下標(biāo)為currentIdx,則可設(shè)置Broker的不可用時(shí)長(zhǎng)為notAvailableDuration[currentIdx],調(diào)用門面類updateFaultItem方法進(jìn)行更新,以此達(dá)到退避的效果。
舉個(gè)例子,如果請(qǐng)求的latency為3300L,則currentLatency=5,對(duì)應(yīng)的不可用時(shí)長(zhǎng)為notAvailableDuration[5]=180000L,也即本次記錄broker需要退避的時(shí)長(zhǎng)180秒。
該延遲機(jī)制(latencyFaultTolerance)也是消費(fèi)者端實(shí)現(xiàn)高可用的核心所在。
?
2、消費(fèi)負(fù)載
這里主要講消費(fèi)端的集群消費(fèi)模式下的處理(另一種模式是廣播模式)。
2.1 消息獲取模型概述
目前客戶端與服務(wù)端(Broker)之間有兩種模式:推模式、拉模式。
這里的推模式是基于拉模式進(jìn)行了封裝,也即通過(guò)長(zhǎng)輪詢的方式來(lái)達(dá)到兼具Pull與Push的優(yōu)點(diǎn)。在服務(wù)端收到客戶端的請(qǐng)求后,會(huì)進(jìn)行查詢,如果隊(duì)列里沒(méi)有數(shù)據(jù),此時(shí)服務(wù)端先掛起,不著急返回,等待一定時(shí)間(默認(rèn)5s)后,會(huì)再進(jìn)一步繼續(xù)查詢,當(dāng)一直未查詢到結(jié)果并超過(guò)重試次數(shù)后返回空結(jié)果(比較適合在客戶端連接數(shù)量可控的場(chǎng)景中)。
PS,RocketMQ的前身,第一代的Notify主要使用了推模型,解決了事務(wù)消息。第二代的MetaQ則主要使用了拉模型,解決了順序消息和海量堆積的問(wèn)題。所以一個(gè)優(yōu)秀的項(xiàng)目其實(shí)都是在不斷進(jìn)化演變中的。
2.2 消費(fèi)者隊(duì)列如何負(fù)載
消息消費(fèi)隊(duì)列在同一消費(fèi)組不同消費(fèi)者之間的負(fù)載均衡,其核心設(shè)計(jì)理念是在一個(gè)消息消費(fèi)隊(duì)列在同一時(shí)間只允許被同一消費(fèi)組內(nèi)的一個(gè)消費(fèi)者消費(fèi),一個(gè)消息消費(fèi)者能同時(shí)消費(fèi)多個(gè)消息隊(duì)列。
在RocketMQ中,消息隊(duì)列的負(fù)載均衡是由客戶端啟動(dòng)MQClientInstance實(shí)例部分時(shí),觸發(fā)負(fù)載均衡服務(wù)線程(具體由RebalanceService線程實(shí)現(xiàn)),默認(rèn)每20s執(zhí)行一次。
底層實(shí)現(xiàn)均衡的邏輯是在RebalanceImpl類的rebalanceByTopic()方法中。代碼如下:
/*** 消費(fèi)負(fù)載均衡核心方法** @param topic 待重均衡主題* @param isOrder*/ private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}/** 集群模式 */case CLUSTERING: {/** 1、獲取該topic下的所有mq消費(fèi)隊(duì)列 */Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);/** 2、獲取該topic、消費(fèi)者分組下的所有消費(fèi)者id */List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);/** 3、獲取消息隊(duì)列分配策略 */AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;/** 4、開(kāi)始給當(dāng)前消費(fèi)者分配消費(fèi)隊(duì)列 */List<MessageQueue> allocateResult = null;try {allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}/** 5、重均衡后,更新快照隊(duì)列信息 */boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;} }具體過(guò)程解釋(針對(duì)集群模式):
1、獲取該topic下的所有mq消息隊(duì)列;
2、獲取該topic、消費(fèi)者分組下的所有消費(fèi)者id;
3、校驗(yàn)步驟1/2中任意一個(gè)結(jié)果,如果結(jié)果為空則跳過(guò)不做處理;否則進(jìn)入步驟4;
4、獲取消息隊(duì)列分配策略;
目前RocketMQ提供了6種分配算法,默認(rèn)使用消息隊(duì)列的平均分配算法(AllocateMessageQueueAveragely),也推薦使用這種。
平均算法舉例說(shuō)明:假設(shè)有8個(gè)隊(duì)列,q1,q2,……,q8,有3個(gè)消費(fèi)者c1,c2,c3,則在平均分配算法下,各消費(fèi)者的分配隊(duì)列如下:
c1:q1,q2,q3
c2:q4,q5,q6
c3:q7,q8
(也因此可以看出,當(dāng)消費(fèi)者數(shù)量大于隊(duì)列數(shù)量時(shí),則會(huì)存在消費(fèi)者無(wú)法分配到隊(duì)列的情況)
RocketMQ提供的6種分配算法5、重均衡后,更新快照隊(duì)列信息(ProcessQueueTable)
此時(shí)調(diào)用RebalanceImpl#updateProcessQueueTableInRebalance()進(jìn)行處理
假設(shè)本次通過(guò)上面幾個(gè)步驟分配后得到的隊(duì)列集合(mqSet)為mq1,mq2,mq3,mq4,在更新ProcessQueueTable中,會(huì)拿已分配到的隊(duì)列與當(dāng)前的消費(fèi)隊(duì)列快照(Queue consumption snapshot)比對(duì)。
變量解釋說(shuō)明:
processQueueTable:當(dāng)前消費(fèi)者負(fù)載的消息隊(duì)列緩存表,結(jié)構(gòu)是 ConcurrentMap<MessageQueue, ProcessQueue>
隊(duì)列的比對(duì)情況(3種)以及對(duì)應(yīng)執(zhí)行的操作如下:
1)當(dāng)前快照隊(duì)列集合存在,新分配隊(duì)列集合不存在(假設(shè)為上圖processQueueTable標(biāo)注的紅色部分,e1,e2)
執(zhí)行剔除e1,e2的操作,將狀態(tài)標(biāo)識(shí)字段 droped 置為 true,這樣,該 ProcessQueue 中的消息將不會(huì)再被消費(fèi)。
2)當(dāng)前快照隊(duì)列集合存在,新分配隊(duì)列集合也存在(假設(shè)為上圖processQueueTable標(biāo)注的綠色部分,e3,e4)
Pull模式直接忽略不做調(diào)整;Push模式下判斷processQueueTable中的該2個(gè)ProcessQueue是否已過(guò)期,已過(guò)期則移除。
3)當(dāng)前快照隊(duì)列集合不存在,新分配隊(duì)列集合存在(假設(shè)為上圖processQueueTable標(biāo)注的白色部分,e5,e6);
本次新增的消息隊(duì)列,添加入processQueueTable中。
至此,完成了消費(fèi)端的負(fù)載均衡。
?
?
?
?
?
?
?
?
?
?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的RocketMQ的负载均衡的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 众辰变频器nz200t参数_【变频器 上
- 下一篇: 几款常用的OCR技术软件 新3