日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ:Producer启动流程与消息发送源码分析

發(fā)布時(shí)間:2025/3/21 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ:Producer启动流程与消息发送源码分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

    • Producer
      • 1.方法和屬性
      • 2.啟動(dòng)流程
      • 3.消息發(fā)送
        • 3.1驗(yàn)證消息
        • 3.2查找路由
        • 3.3選擇隊(duì)列
        • 3.4發(fā)送消息
        • 3.5發(fā)送批量消息

Producer

在RocketMQ中,消息生產(chǎn)者就是客戶端,即消息的提供者。

以下是消息生產(chǎn)者Producer項(xiàng)目預(yù)覽圖:

1.方法和屬性

Producer的相關(guān)核心類:

MQAdmin接口方法介紹:

//創(chuàng)建主題 void createTopic(final String key, final String newTopic, final int queueNum)throws MQClientException; //根據(jù)時(shí)間戳從隊(duì)列中查找消息偏移量 long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; //查找消息隊(duì)列中最大/小偏移量 long maxOffset(final MessageQueue mq) throws MQClientException; long minOffset(final MessageQueue mq) throws MQClientException; //根據(jù)偏移量查找信息 MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,InterruptedException, MQClientException; //根據(jù)條件查詢消息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,final long end) throws MQClientException, InterruptedException; //根據(jù)主題和消息ID查詢消息 MessageExt viewMessage(String topic,String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

MQProducer接口方法介紹:

//啟動(dòng) void start() throws MQClientException; //關(guān)閉 void shutdown(); //查找該主題下的所有消息隊(duì)列 List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException; //同步發(fā)送消息 SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException; //同步超時(shí)發(fā)送消息 SendResult send(final Message msg, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException; //異步發(fā)送消息 void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException; //異步并附帶超時(shí)時(shí)間的消息發(fā)送 void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException; //發(fā)送單向消息-無需關(guān)注返回結(jié)果-void void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException; //同步并指定消息隊(duì)列發(fā)送消息 SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException; //選擇指定隊(duì)列異步發(fā)送消息 void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)throws MQClientException, RemotingException, InterruptedException; //批量發(fā)送消息 SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;

DefaultMQProducer:

屬性介紹:

producerGroup:生產(chǎn)者所屬組 createTopicKey:默認(rèn)Topic defaultTopicQueueNums:默認(rèn)主題在每一個(gè)Broker隊(duì)列數(shù)量 sendMsgTimeout:發(fā)送消息默認(rèn)超時(shí)時(shí)間,默認(rèn)3s compressMsgBodyOverHowmuch:消息體超過該值則啟用壓縮,默認(rèn)4k retryTimesWhenSendFailed:同步方式發(fā)送消息重試次數(shù),默認(rèn)為2,總共執(zhí)行3次 retryTimesWhenSendAsyncFailed:異步方法發(fā)送消息重試次數(shù),默認(rèn)為2 retryAnotherBrokerWhenNotStoreOK:消息重試時(shí)選擇另外一個(gè)Broker時(shí),是否不等待存儲(chǔ)結(jié)果就返回,默認(rèn)為false maxMessageSize:允許發(fā)送的最大消息長度,默認(rèn)為4M

2.啟動(dòng)流程

啟動(dòng)時(shí)序圖如下:

DefaultMQProducerImpl#start

switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;//檢查生產(chǎn)者組配置this.checkConfig();//生產(chǎn)組名=CLIENT_INNER_PRODUCERif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {//將生產(chǎn)者實(shí)例名稱改為PIDthis.defaultMQProducer.changeInstanceNameToPID();}//獲得MQ客戶端實(shí)例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);//注冊生產(chǎn)者到MQClientInstance中并返回注冊結(jié)果boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}//存入Topic主題發(fā)布信息this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());//如果是初次啟動(dòng)if (startFactory) {//啟動(dòng)MQ客戶端mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());//將服務(wù)狀態(tài)改為RUNNINGthis.serviceState = ServiceState.RUNNING;break;

MQClientManager

//單例-一個(gè)JVM中只存在一個(gè)MQClientManager實(shí)例 private static MQClientManager instance = new MQClientManager(); //MQClientManager-維護(hù)一個(gè)MQClientInstance緩存表 //同一個(gè)clientId只會(huì)對應(yīng)一個(gè)MQClientInstance //MQClientInstance封裝了RocketMQ網(wǎng)絡(luò)處理API,是消息生產(chǎn)者和消息消費(fèi)者與NameServer、Broker打交道的網(wǎng)絡(luò)通道 private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();

MQClientManager#getOrCreateMQClientInstance

//構(gòu)建MQClientId String clientId = clientConfig.buildMQClientId(); //在緩存表中查詢是否存在instance MQClientInstance instance = this.factoryTable.get(clientId); //instance不存在 if (null == instance) {//構(gòu)建instanceinstance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);//存入表中MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);} }return instance;

3.消息發(fā)送

消息發(fā)送時(shí)序圖:

DefaultMQProducerImpl#send(Message msg)

/*** DEFAULT SYNC -------------------------------------------------------*/ public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }

DefaultMQProducer#send(Message msg,long timeout)

/**** @param msg* @param timeout 默認(rèn)超時(shí)時(shí)長為3s* @return 返回發(fā)送結(jié)果*/ public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }

DefaultMQProducerImpl#sendDefaultImpl

3.1驗(yàn)證消息

//驗(yàn)證消息 Validators.checkMessage(msg, this.defaultMQProducer);

Validators#checkMessage

//檢查是否為空 if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic-校驗(yàn)主題 Validators.checkTopic(msg.getTopic()); //是否是禁止發(fā)送的消息主題 Validators.isNotAllowedSendTopic(msg.getTopic());// body-檢查是否為空 if (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } //body-檢查消息體是否大于消息最大限制大小 if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); }

3.2查找路由

DefaultMQProducerImpl#tryToFindTopicPublishInfo

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

DefaultMQProducerImpl#tryToFindTopicPublishInfo

//在本地緩存中獲取主題的路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //若路由信息為空 || !ok() -> !(null != this.messageQueueList && !this.messageQueueList.isEmpty()) if (null == topicPublishInfo || !topicPublishInfo.ok()) {//為主題創(chuàng)建路由信息-存入緩存表this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//從nameServer中獲取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic); } //如果查詢出的Info合法-返回Info if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo; } else {//否則將從nameServer獲取的路由信息更新到緩存表中this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo; }

TopicPublishInfo

public class TopicPublishInfo {private boolean orderTopic = false; //是否是順序消息private boolean haveTopicRouterInfo = false; //是否有路由信息private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); //該主題的消息隊(duì)列private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); //每選擇一次消息隊(duì)列該值+1private TopicRouteData topicRouteData; //關(guān)聯(lián)Topic路由元信息 }

TopicRouteData

public class TopicRouteData extends RemotingSerializable {private String orderTopicConf; //順序消息配置private List<QueueData> queueDatas; //Broker隊(duì)列信息private List<BrokerData> brokerDatas; //Broker信息private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; //消息過濾表 }

MQClientInstance#updateTopicRouteInfoFromNameServer

//this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)-加鎖 TopicRouteData topicRouteData; //使用默認(rèn)主題從NameServer獲取路由信息 if (isDefault && defaultMQProducer != null) {topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),clientConfig.getMqClientApiTimeout());if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}} } else {//使用指定主題從NameServer獲取路由信息topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); } if (changed) {//克隆出一份主題路由信息TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{//將topicRouteData轉(zhuǎn)化為TopicPublishInfoTopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);//是否有主題路由信息-設(shè)置為truepublishInfo.setHaveTopicRouterInfo(true);Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();//遍歷生產(chǎn)者while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();//如果生產(chǎn)者不為空if (impl != null) {//更新publishInfo信息impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{//主題訂閱信息-消息消費(fèi)隊(duì)列Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);//遍歷消費(fèi)者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();//消費(fèi)者不為空if (impl != null) {//更新subscribeInfo信息impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true; }

MQClientInstance#topicRouteData2TopicPublishInfo

public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {//創(chuàng)建TopicPublishInfo對象TopicPublishInfo info = new TopicPublishInfo();//關(guān)聯(lián)TopicRouteData信息info.setTopicRouteData(route);//順序消息,更新TopicPublishInfoif (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {//獲取Broke列表-用分號隔開String[] brokers = route.getOrderTopicConf().split(";");for (String broker : brokers) {String[] item = broker.split(":");int nums = Integer.parseInt(item[1]);for (int i = 0; i < nums; i++) {MessageQueue mq = new MessageQueue(topic, item[0], i);info.getMessageQueueList().add(mq);}}//設(shè)置為順序消息info.setOrderTopic(true);} else {//非順序消息更新TopicPublishInfo//獲取消息隊(duì)列信息List<QueueData> qds = route.getQueueDatas();Collections.sort(qds);//遍歷topic隊(duì)列信息for (QueueData qd : qds) {//權(quán)限為可寫if (PermName.isWriteable(qd.getPerm())) {BrokerData brokerData = null;//遍歷寫隊(duì)列for (BrokerData bd : route.getBrokerDatas()) {//根據(jù)名稱獲取寫隊(duì)列對應(yīng)的brokerif (bd.getBrokerName().equals(qd.getBrokerName())) {brokerData = bd;break;}}if (null == brokerData) {continue;}if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {continue;}//填充TopicPublishInfo消息隊(duì)列列表for (int i = 0; i < qd.getWriteQueueNums(); i++) {MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);info.getMessageQueueList().add(mq);}}}//順序消息設(shè)置為falseinfo.setOrderTopic(false);}return info; }

3.3選擇隊(duì)列

//DefaultMQProducerImpl#sendDefaultImpl MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); //DefaultMQProducerImpl#selectOneMessageQueue return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);

MQFaultStrategy#selectOneMessageQueue

//是否啟用Broker故障延遲機(jī)制 if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0) {pos = 0;}MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {return mq;}}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue(); } //不啟用Broker故障延遲機(jī)制 return tpInfo.selectOneMessageQueue(lastBrokerName);

默認(rèn)不啟用Broker故障延遲機(jī)制

TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {//第一次選擇消息隊(duì)列if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {//選擇一次-sendWhichQueue自增int index = this.sendWhichQueue.incrementAndGet();//取模int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0) {pos = 0;}//輪詢選擇消息隊(duì)列MessageQueue mq = this.messageQueueList.get(pos);//規(guī)避上次選擇的隊(duì)列if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();} }

TopicPublishInfo#selectOneMessageQueue()

//第一次選擇消息隊(duì)列 public MessageQueue selectOneMessageQueue() {//sendWhichQueue自增int index = this.sendWhichQueue.incrementAndGet();//對隊(duì)列大小取模int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0) {pos = 0;}//返回對應(yīng)的隊(duì)列return this.messageQueueList.get(pos); }

啟用Broker故障延遲機(jī)制

DefaultMQProducerImpl#selectOneMessageQueue

//Broker故障延遲機(jī)制 if (this.sendLatencyFaultEnable) {try {//對sendWhichQueue自增int index = tpInfo.getSendWhichQueue().incrementAndGet();//對消息隊(duì)列輪詢獲取一個(gè)隊(duì)列for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0) {pos = 0;}MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//驗(yàn)證該隊(duì)列是否可用->可用即返回-不可用繼續(xù)輪詢if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {return mq;}}//沒有選出較為合適的消息隊(duì)列->讓延遲容錯(cuò)機(jī)制至少選出一個(gè)Broker出來final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();//寫隊(duì)列個(gè)數(shù)int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);//寫隊(duì)列個(gè)數(shù)大于0if (writeQueueNums > 0) {//選出一個(gè)消息隊(duì)列->指定broker和隊(duì)列ID并返回final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {//該Broker也不可用->從容錯(cuò)隊(duì)列中移除該BrokerlatencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue(); }

  • 延遲機(jī)制接口規(guī)范
public interface LatencyFaultTolerance<T> {//更新失敗條目void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);//Broker是否可用boolean isAvailable(final T name);//移除Fault條目void remove(final T name);//嘗試從規(guī)避的Broker中選出一個(gè)可用的BrokerT pickOneAtLeast(); }
  • FaultItem:失敗條目
class FaultItem implements Comparable<FaultItem> {//條目唯一key->BrokerNameprivate final String name;//本次發(fā)送消息延遲private volatile long currentLatency;//故障規(guī)避開始時(shí)間private volatile long startTimestamp; }
  • 消息失敗策略
public class MQFaultStrategy {//根據(jù)currentLatency本地消息發(fā)送延遲,從latencyMax尾部向前找到第一個(gè)比currentLatency小的索引,如果沒有找到,返回0private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};//根據(jù)這個(gè)索引從notAvailableDuration取出對應(yīng)的時(shí)間,在該時(shí)長內(nèi),Broker設(shè)置為不可用private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; }

DefaultMQProducerImpl#sendDefaultImpl

//消息發(fā)送->發(fā)送成功調(diào)用回調(diào)函數(shù)sendCallback sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); //調(diào)用失敗記錄失敗時(shí)間戳 endTimestamp = System.currentTimeMillis(); //更新調(diào)用失敗條目 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

MQFaultStrategy#updateFaultItem

if (this.sendLatencyFaultEnable) {//計(jì)算broker規(guī)避的時(shí)長long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);//更新該FaultItem規(guī)避時(shí)長this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); }

MQFaultStrategy#computeNotAvailableDuration

//遍歷latencyMax for (int i = latencyMax.length - 1; i >= 0; i--) {//找到第一個(gè)比currentLatency的latencyMax值if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i]; } //沒有找到則返回0 return 0;

LatencyFaultToleranceImpl#updateFaultItem

//原來的失敗條目信息 FaultItem old = this.faultItemTable.get(name); if (null == old) {//失敗條目為空->新建faultItem對象設(shè)置規(guī)避時(shí)長和開始時(shí)間final FaultItem faultItem = new FaultItem(name);faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);} } else {//不為空->更新規(guī)避時(shí)長和開始時(shí)間old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); }

3.4發(fā)送消息

DefaultMQProducerImpl#sendKernelImpl

private SendResult sendKernelImpl(final Message msg, //待發(fā)送消息final MessageQueue mq, //消息發(fā)送隊(duì)列 final CommunicationMode communicationMode, //消息發(fā)送模式->ASYNC/SYNC/ONEWAYfinal SendCallback sendCallback, //異步消息回調(diào)函數(shù)final TopicPublishInfo topicPublishInfo, //主題路由信息 final long timeout) //消息發(fā)送超時(shí)時(shí)間 //根據(jù)BrokerName獲取Broker地址 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); //地址為空 if (null == brokerAddr) {//更新broker網(wǎng)絡(luò)地址信息tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } //非批量消息發(fā)送->設(shè)置消息唯一ID //批量消息->在消息打包過程中已經(jīng)生成唯一ID if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg); }boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true; }int sysFlag = 0; boolean msgBodyCompressed = false; //大于4k->進(jìn)行壓縮 if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true; } //如果是事務(wù)消息,設(shè)置消息標(biāo)記MessageSysFlag.TRANSACTION_PREPARED_TYPE final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } //如果注冊消息發(fā)送鉤子函數(shù)->消息發(fā)送之前進(jìn)行邏輯增強(qiáng) if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context); }

SendMessageHook

public interface SendMessageHook {String hookName();void sendMessageBefore(final SendMessageContext context);void sendMessageAfter(final SendMessageContext context); } //構(gòu)建消息發(fā)送請求頭 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); //生產(chǎn)者組 requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); //主題 requestHeader.setTopic(msg.getTopic()); //創(chuàng)建默認(rèn)主題 requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); //該主題默認(rèn)主題隊(duì)列個(gè)數(shù)4 requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); //隊(duì)列ID requestHeader.setQueueId(mq.getQueueId()); //消息系統(tǒng)標(biāo)識(shí) requestHeader.setSysFlag(sysFlag); //消息發(fā)送時(shí)間戳 requestHeader.setBornTimestamp(System.currentTimeMillis()); //消息標(biāo)記 requestHeader.setFlag(msg.getFlag()); //消息拓展信息 requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); //消息重試次數(shù)->初始為0 requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); //是否是批量消息 requestHeader.setBatch(msg instanceof MessageBatch); //如果是發(fā)送重試消息 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);//對消息重試次數(shù)進(jìn)行更新if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);} } SendResult sendResult = null; switch (communicationMode) {//異步發(fā)送 case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;//如果消息體被壓縮if (msgBodyCompressed) {//msgBody應(yīng)該使用prevBodytmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//發(fā)送消息并返回結(jié)果sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY://同步發(fā)送 case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break; } //如果注冊了鉤子函數(shù)->發(fā)送完畢后執(zhí)行鉤子函數(shù) if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context); }

3.5發(fā)送批量消息

批量消息發(fā)送時(shí)序圖:

批量消息發(fā)送是將一個(gè)主題的多條消息進(jìn)行打包后發(fā)送到消息消費(fèi)端,以此減少網(wǎng)絡(luò)調(diào)用,提高網(wǎng)絡(luò)傳輸以及消息發(fā)送效率。但是,同一批次的消息數(shù)量不是越多越好,如果消息內(nèi)容過長,則打包消息過程中會(huì)導(dǎo)致占用線程資源時(shí)間過長,從而導(dǎo)致其他線程發(fā)送消息響應(yīng)時(shí)間過長,并且單批次消息總長度不能超過DefaultMQProducer#maxMessageSize -> 4M。

DefaultMQProducer#send

public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//調(diào)用batch方法將消息進(jìn)行打包后進(jìn)行發(fā)送return this.defaultMQProducerImpl.send(batch(msgs)); } //繼承Message->其實(shí)就是用一個(gè)List將多個(gè)消息存封裝起來->上述代碼中的batch方法作用就是否則將消息封裝成MessageBatch public class MessageBatch extends Message implements Iterable<Message> {private static final long serialVersionUID = 621335151046335557L;private final List<Message> messages; }

DefaultMQProducer#batch

MessageBatch msgBatch; try {//將消息集合封裝到MessageBatch.messagesmsgBatch = MessageBatch.generateFromList(msgs);//遍歷消息for (Message message : msgBatch) {//對消息一一進(jìn)行檢查Validators.checkMessage(message, this);//對每個(gè)消息設(shè)置唯一ID和TopicMessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}//編碼后存入Message.bodymsgBatch.setBody(msgBatch.encode()); } catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e); } //設(shè)置msgBatch的主題Topic msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch;

將消息封裝成MessageBatch之后的消息發(fā)送步驟跟單條消息的發(fā)送步驟完全一致,至此消息發(fā)送已經(jīng)完成。

以上。

本文僅作為個(gè)人學(xué)習(xí)使用,水平有限,如有錯(cuò)誤請指正!

總結(jié)

以上是生活随笔為你收集整理的RocketMQ:Producer启动流程与消息发送源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。