日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java 本地 mq_java rocketmq--消息的产生(普通消息)

發(fā)布時間:2023/12/15 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 本地 mq_java rocketmq--消息的产生(普通消息) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

與消息發(fā)送緊密相關的幾行代碼:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那這幾行代碼執(zhí)行時,背后都做了什么?

一. 首先是DefaultMQProducer.start

@Override

public void start() throws MQClientException {

this.defaultMQProducerImpl.start();

}

調(diào)用了默認生成消息的實現(xiàn)類 -- DefaultMQProducerImpl

調(diào)用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()會初始化得到MQClientInstance實例對象,MQClientInstance實例對象調(diào)用它自己的start方法會 ,啟動一些服務,如拉去消息服務PullMessageService.Start()、啟動負載平衡服務RebalanceService.Start(),比如網(wǎng)絡通信服務MQClientAPIImpl.Start()

另外,還會執(zhí)行與生產(chǎn)消息相關的信息,如注冊produceGroup、new一個TopicPublishInfo對象并以默認TopicKey為鍵值,構(gòu)成鍵值對存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,獲取的MQClientInstance實例對象會調(diào)用sendHeartbeatToAllBroker()方法,不斷向broker發(fā)送心跳包,yin'b可以使用下面一幅圖大致描述DefaultMQProducerImpl.start()過程:

上圖中的三個部分中涉及的內(nèi)容:

1.1 初始化MQClientInstance

一個客戶端只能產(chǎn)生一個MQClientInstance實例對象,產(chǎn)生方式使用了工廠模式與單例模式。MQClientInstance.start()方法啟動一些服務,源碼如下:

public void start() throws MQClientException {

synchronized (this) {

switch (this.serviceState) {

case CREATE_JUST:

this.serviceState = ServiceState.START_FAILED;

// If not specified,looking address from name server

if (null == this.clientConfig.getNamesrvAddr()) {

this.mQClientAPIImpl.fetchNameServerAddr();

}

// Start request-response channel

this.mQClientAPIImpl.start();

// Start various schedule tasks

this.startScheduledTask();

// Start pull service

this.pullMessageService.start();

// Start rebalance service

this.rebalanceService.start();

// Start push service

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

log.info("the client factory [{}] start OK", this.clientId);

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

break;

case SHUTDOWN_ALREADY:

break;

case START_FAILED:

throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);

default:

break;

}

}

}

1.2 注冊producer

該過程會將這個當前producer對象注冊到MQClientInstance實例對象的的producerTable中。一個jvm(一個客戶端)中一個producerGroup只能有一個實例,MQClientInstance操作producerTable大概有如下幾個方法:

-- selectProducer

-- updateTopicRouteInfoFromNameServer

-- prepareHeartbeatData

-- isNeedUpdateTopicRouteInfo

-- shutdown

注:

根據(jù)不同的clientId,MQClientManager將給出不同的MQClientInstance;

根據(jù)不同的group,MQClientInstance將給出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定義:

public class DefaultMQProducerImpl implements MQProducerInner {

private final Logger log = ClientLogger.getLog();

private final Random random = new Random();

private final DefaultMQProducer defaultMQProducer;

private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap();

它是一個以topic為key的Map型數(shù)據(jù)結(jié)構(gòu),DefaultMQProducerImpl.start()時會默認創(chuàng)建一個key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 發(fā)送心跳包

MQClientInstance向broker發(fā)送心跳包時,調(diào)用sendHeartbeatToAllBroker( ),以及從MQClientInstance實例對象的brokerAddrTable中拿到所有broker地址,向這些broker發(fā)送心跳包。

sendHeartbeatToAllBroker會涉及到prepareHeartbeatData()方法,該方法會生成heartbeatData數(shù)據(jù),發(fā)送心跳包時,heartbeatData作為心跳包的body。與producer相關的部分代碼如下:

// Producer

for (Map.Entry entry : this.producerTable.entrySet()) {

MQProducerInner impl = entry.getValue();

if (impl != null) {

ProducerData producerData = new ProducerData();

producerData.setGroupName(entry.getKey());

heartbeatData.getProducerDataSet().add(producerData);

}

二、. SendResult sendResult = producer.send(msg)

首先會調(diào)用DefaultMQProducer.send(msg) ,繼而調(diào)用sendDefaultImpl:

public SendResult send(Message msg,

long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);

}

sendDefaultImpl做了啥?

2.1. 獲取topicPublishInfo

根據(jù)msg的topic從topicPublishInfoTable獲取對應的topicPublishInfo,如果沒有則更新路由信息,從nameserver端拉取最新路由信息。從nameserver端拉取最新路由信息大致為:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

2.2 選擇消息發(fā)送的隊列

普通消息:默認方式下,selectOneMessageQueue從topicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發(fā)送消息,默認采用長輪詢的方式選擇隊列 。

它的機制如下:正常情況下,順序選擇queue進行發(fā)送;如果某一個節(jié)點發(fā)生了超時,則下次選擇queue時,跳過相同的broker。不同的隊列選擇策略形成了生產(chǎn)消息的幾種模式,如順序消息,事務消息。

順序消息:將一組需要有序消費的消息發(fā)往同一個broker的同一個隊列上即可實現(xiàn)順序消息,假設相同訂單號的支付,退款需要放到同一個隊列,那么就可以在send的時候,自己實現(xiàn)MessageQueueSelector,根據(jù)參數(shù)arg字段來選擇queue。

private SendResult sendSelectImpl(

Message msg,

MessageQueueSelector selector,

Object arg,

final CommunicationMode communicationMode,

final SendCallback sendCallback, final long timeout

) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事務消息:只有在消息發(fā)送成功,并且本地操作執(zhí)行成功時,才發(fā)送提交事務消息,做事務提交,消息發(fā)送失敗,直接發(fā)送回滾消息,進行回滾,具體如何實現(xiàn)后面會單獨成文分析。

2.3 封裝消息體通信包,發(fā)送數(shù)據(jù)包

首先,根據(jù)獲取的MessageQueue中的getBrokerName,調(diào)用findBrokerAddressInPublish得到該消息存放對應的broker地址,如果沒有找到則跟新路由信息,重新獲取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知獲取的broker均為master(id=0)

然后, 將與該消息相關信息打包成RemotingCommand數(shù)據(jù)包,其RequestCode.SEND_MESSAGE

根據(jù)獲取的broke地址,將數(shù)據(jù)包到對應的broker,默認是發(fā)送超時時間為3s。

封裝消息請求包的包頭:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());

requestHeader.setTopic(msg.getTopic());

requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());

requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());

requestHeader.setQueueId(mq.getQueueId());

requestHeader.setSysFlag(sysFlag);

requestHeader.setBornTimestamp(System.currentTimeMillis());

requestHeader.setFlag(msg.getFlag());

requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

requestHeader.setReconsumeTimes(0);

requestHeader.setUnitMode(this.isUnitMode());

requestHeader.setBatch(msg instanceof MessageBatch);

發(fā)送消息包(普通消息默認為同步方式):

SendResult sendResult = null;

switch (communicationMode) {

case SYNC:

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(

brokerAddr,

mq.getBrokerName(),

msg,

requestHeader,

timeout,

communicationMode,

context,

this);

break;

處理來自broker端的響應數(shù)據(jù)包:

private SendResult sendMessageSync(

final String addr,

final String brokerName,

final Message msg,

final long timeoutMillis,

final RemotingCommand request

) throws RemotingException, MQBrokerException, InterruptedException {

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

assert response != null;

return this.processSendResponse(brokerName, msg, response);

}

broker端處理request數(shù)據(jù)包后會將消息存儲到commitLog,具體過程后續(xù)分析。

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

總結(jié)

以上是生活随笔為你收集整理的java 本地 mq_java rocketmq--消息的产生(普通消息)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。