RocketMQ源码分析之request-reply特性
1.什么是request-reply?
??RocketMQ4.6.0版本中增加了request-reply新特性,該特性允許producer在發(fā)送消息后同步或者異步等待consumer消費(fèi)完消息并返回響應(yīng)消息,類似rpc調(diào)用效果。
2. 使用場(chǎng)景
- 快速搭建服務(wù)總線,實(shí)現(xiàn)rpc框架
- 調(diào)用鏈追蹤分析
- 跨網(wǎng)絡(luò)區(qū)域?qū)崿F(xiàn)系統(tǒng)間同步調(diào)用
3.使用方法
- producer端
??producer端調(diào)用request(final Message msg, final long timeout)方法以同步方式等待consumer端消費(fèi)完消息并返回響應(yīng)消息;調(diào)用request(final Message msg, final RequestCallback requestCallback, final long timeout)方法以異步方式等待consumer端消費(fèi)完消息并返回響應(yīng)消息。
同步方式:
public class RequestProducer {public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "RequestTopic0218";String topic = "RequestTopic";long ttl = 300000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);} catch (Exception e) {e.printStackTrace();}producer.shutdown();} }異步方式:
public class AsyncRequestProducer {private static final InternalLogger log = ClientLogger.getLog();public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "please_rename_unique_group_name";String topic = "AsynRequestTopic";long ttl = 3000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();producer.request(msg, new RequestCallback() {@Overridepublic void onSuccess(Message message) {long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);}@Overridepublic void onException(Throwable e) {System.err.printf("request to <%s> fail.", topic);}}, ttl);} catch (Exception e) {log.warn("", e);}/* shutdown after your request callback is finished */ // producer.shutdown();} }- consumer端
??consumer端程序在原來(lái)的基礎(chǔ)上會(huì)增加以下內(nèi)容:
??(1)創(chuàng)建producer用來(lái)發(fā)送消息
??(2)在消費(fèi)完消息后調(diào)用RocketMQ提供的MessageUtil.createReplyMessage(final Message requestMessage, final byte[] body)方法來(lái)構(gòu)建響應(yīng)消息
??(3)調(diào)用send方法將響應(yīng)消息發(fā)回給生產(chǎn)者
public class ResponseConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {String producerGroup = "ReplyProducer0218";String consumerGroup = "ResponseConsumer0218";String topic = "RequestTopic";// create a producer to send reply messageDefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);replyProducer.setNamesrvAddr("127.0.0.1:9876");replyProducer.start();// create consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// recommend client configsconsumer.setPullTimeDelayMillsWhenException(0L);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes();// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 300000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.subscribe(topic, "*");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();System.out.printf("Consumer Started.%n");} }在RocketMQ中producer端可以通過(guò)調(diào)用以下兩個(gè)方法發(fā)送消息并等待consumer端返回響應(yīng)消息:
- request(final Message msg, final long timeout)
- request(final Message msg, final RequestCallback requestCallback, final long timeout)
下面以producer同步等待consumer響應(yīng)消息為例分析整個(gè)request-reply的過(guò)程:
public Message request(Message msg,long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginTimestamp = System.currentTimeMillis();prepareSendRequest(msg, timeout);final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);try {final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);long cost = System.currentTimeMillis() - beginTimestamp;this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {requestResponseFuture.setSendRequestOk(true);}@Overridepublic void onException(Throwable e) {requestResponseFuture.setSendRequestOk(false);requestResponseFuture.putResponseMessage(null);requestResponseFuture.setCause(e);}}, timeout - cost);return waitResponse(msg, timeout, requestResponseFuture, cost);} finally {RequestFutureTable.getRequestFutureTable().remove(correlationId);}}(1)獲取系統(tǒng)當(dāng)前時(shí)間,方便后續(xù)進(jìn)行超時(shí)判斷
(2)調(diào)用prepareSendRequest(final Message msg, long timeout)函數(shù)將待發(fā)送給broker的消息進(jìn)行改造,具體改造如下:
- 調(diào)用CorrelationIdUtil.createCorrelationId()生成該消息的correlationId,并將correlationId添加到消息的擴(kuò)展屬性CORRELATION_ID
- 獲取producer的clientId并將其添加到消息的擴(kuò)展屬性REPLY_TO_CLIENT,該屬性的作用在于后續(xù)consumer端發(fā)送響應(yīng)消息時(shí)broker知道將消息發(fā)送給哪個(gè)producer端
- 將超時(shí)時(shí)間添加到消息的擴(kuò)展屬性TTL
(3)構(gòu)建RequestResponseFuture對(duì)象,這里需要詳細(xì)解釋RequestResponseFuture對(duì)象,RequestResponseFuture是實(shí)現(xiàn)request-reply特性的關(guān)鍵,producer發(fā)送的每條消息都會(huì)new一個(gè)RequestResponseFuture對(duì)象:
- correlationId是CorrelationIdUtil.createCorrelationId()方法隨機(jī)生成的UUID字符串,correlationId是用來(lái)標(biāo)識(shí)從發(fā)送每條消息到conumer端發(fā)送響應(yīng)消息的請(qǐng)求
- requestMsg是consumer端返回的響應(yīng)消息
- countDownLatch在消息發(fā)送時(shí)會(huì)阻塞producer線程(調(diào)用了await實(shí)現(xiàn)阻塞),等到響應(yīng)消息返回時(shí)激活producer線程,最后返回consumer端響應(yīng)消息,所以雖然在內(nèi)部實(shí)現(xiàn)上是以異步方式發(fā)送消息但是結(jié)合countDownLatch達(dá)到了同步的效果
- 由于是同步發(fā)送所以requestCallback為null
(4)將<correlationId, requestResponseFuture>添加到requestFutureTable,后續(xù)consumer向broker發(fā)送RequestCode.SEND_REPLY_MESSAGE_V2請(qǐng)求將響應(yīng)消息發(fā)送到broker,broker在處理這個(gè)請(qǐng)求時(shí)會(huì)調(diào)用pushReplyMessage方法發(fā)送RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT請(qǐng)求給producer,此時(shí)producer端會(huì)根據(jù)響應(yīng)消息中correlationId在requestFutureTable中獲取其對(duì)應(yīng)的requestResponseFuture,并且會(huì)將響應(yīng)消息賦給requestResponseFuture中的responseMsg。
(5)調(diào)用sendDefaultImpl方法以異步的方式發(fā)送消息,雖然是以異步方式發(fā)送消息但是結(jié)合RequestResponseFuture中的countDownLatch到達(dá)了同步效果。此時(shí)producer發(fā)送了RequestCode.SEND_MESSAGE請(qǐng)求給broker,broker后續(xù)的處理過(guò)程與發(fā)送普通消息是一樣的。
(6)consumer在正常消費(fèi)完消息后,需要調(diào)用MessageUtil.createReplyMessage方法構(gòu)建響應(yīng)消息,該方法有兩個(gè)參數(shù),分別是producer發(fā)送消息和響應(yīng)消息體內(nèi)容,該方法會(huì)從producer發(fā)送的消息的擴(kuò)展屬性中獲取“CLUSTER”、“REPLY_TO_CLIENT”、“CORRELATION_ID”和“TTL”,并根據(jù)這些擴(kuò)展屬性以及響應(yīng)消息體內(nèi)容構(gòu)建響應(yīng)消息。這里需要注意,新構(gòu)建的響應(yīng)消息的topic是由producer發(fā)送的消息的擴(kuò)展屬性中的CLUSTER與REPLY_TOPIC拼接起來(lái),即“集群名稱_REPLY_TOPIC”,這個(gè)是一個(gè)系統(tǒng)級(jí)別的topic,是由broker自己創(chuàng)建的。
public static Message createReplyMessage(final Message requestMessage, final byte[] body) throws MQClientException {if (requestMessage != null) {Message replyMessage = new Message();String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);replyMessage.setBody(body);if (cluster != null) {String replyTopic = MixAll.getReplyTopic(cluster);replyMessage.setTopic(replyTopic);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);return replyMessage;} else {throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");}}throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null.");}(7)調(diào)用send方法發(fā)送響應(yīng)消息到broker,在發(fā)送的過(guò)程中會(huì)判斷消息的類型,由于該消息是reply類型的,所以向broker發(fā)送的請(qǐng)求類型是RequestCode.SEND_REPLY_MESSAGE_V2
(8)broker處理RequestCode.SEND_REPLY_MESSAGE_V2請(qǐng)求的是ReplyMessageProcessor,具體操作如下:
- 根據(jù)請(qǐng)求中響應(yīng)消息的topic、queueId、消息體內(nèi)容、消息標(biāo)記、消息的擴(kuò)展屬性、消息產(chǎn)生的時(shí)間、消息的來(lái)源等信息構(gòu)建MessageExtBrokerInner對(duì)象
- 調(diào)用pushReplyMessage方法構(gòu)建RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT請(qǐng)求,然后根據(jù)消息擴(kuò)展屬性REPLY_TO_CLIENT獲取broker與producer連接的channel,最后將請(qǐng)求發(fā)送給producer。這里有個(gè)問(wèn)題:RocketMQ如何保證請(qǐng)求原路返回?首先producer產(chǎn)生的消息會(huì)發(fā)送到broker上,此時(shí)broker中存儲(chǔ)的producer產(chǎn)生的消息的擴(kuò)展屬性中是包含存儲(chǔ)的broker的集群名稱的,接著consumer消息該消息并根據(jù)該消息構(gòu)造出響應(yīng)消息,在構(gòu)造響應(yīng)消息時(shí),其topic是“集群名稱_REPLY_TOPIC”,這樣就保證了consumer在發(fā)送響應(yīng)消息到broker是原路返回,即這里的broker是與producer連接的broker。
- 判斷broker端的配置文件中storeReplyMessageEnable配置項(xiàng)的值是否為true,如果為true,則會(huì)將響應(yīng)消息存儲(chǔ)在broker端。storeReplyMessageEnable的默認(rèn)值是true。
(9)producer處理broker發(fā)送的RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT請(qǐng)求的是ClientRemotingProcessor,具體如下:
- 根據(jù)請(qǐng)求還原響應(yīng)消息MessageExt
- 獲取響應(yīng)消息擴(kuò)展屬性CORRELATION_ID的值correlationId,在producer端的requestFutureTable中根據(jù)correlationId獲取該消息對(duì)應(yīng)的requestResponseFuture,然后將響應(yīng)消息放入到requestResponseFuture中的responseMsg并將countDownLatch的值減一,此時(shí)producer端調(diào)用request方法的線程就激活了
- 從requestFutureTable中刪除key為correlationId的數(shù)據(jù)項(xiàng)
(10)producer端調(diào)用request方法線程激活后會(huì)調(diào)用waitResponse方法返回requestResponseFuture中的responseMsg,這里最終調(diào)用的waitResponseMessage方法中帶有一個(gè)參數(shù):超時(shí)時(shí)間,如果到了超時(shí)時(shí)間后consumer端的響應(yīng)消息沒(méi)有被producer端收到,線程也會(huì)被激活,這樣的設(shè)置也是防止producer線程一直被阻塞。
參考資料: 官方視頻鏈接.
總結(jié)
以上是生活随笔為你收集整理的RocketMQ源码分析之request-reply特性的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Linux traceroute no
- 下一篇: 十大诱人垃圾食物