快速学习-RocketMQ-“Request-Reply”特性
“Request-Reply”特性
1 使用場景
隨著服務規模的擴大,單機服務無法滿足性能和容量的要求,此時需要將服務拆分為更小粒度的服務或者部署多個服務實例構成集群來提供服務。在分布式場景下,RPC是最常用的聯機調用的方式。
在構建分布式應用時,有些領域,例如金融服務領域,常常使用消息隊列來構建服務總線,實現聯機調用的目的。消息隊列的主要場景是解耦、削峰填谷,在聯機調用的場景下,需要將服務的調用抽象成基于消息的交互,并增強同步調用的這種交互邏輯。為了更好地支持消息隊列在聯機調用場景下的應用,rocketmq-4.7.0推出了“Request-Reply”特性來支持RPC調用。
2 設計思路
在rocketmq中,整個同步調用主要包括兩個過程:
(1)請求方生成消息,發送給響應方,并等待響應方回包;
(2)響應方收到請求消息后,消費這條消息,并發出一條響應消息給請求方。
整個過程實質上是兩個消息收發過程的組合。所以這里最關鍵的問題是如何將異步的消息收發過程構建成一個同步的過程。其中主要有兩個問題需要解決:
2.1 請求方如何同步等待回包
這個問題的解決方案中,一個關鍵的數據結構是RequestResponseFuture。
public class RequestResponseFuture {private final String correlationId;private final RequestCallback requestCallback;private final long beginTimestamp = System.currentTimeMillis();private final Message requestMsg = null;private long timeoutMillis;private CountDownLatch countDownLatch = new CountDownLatch(1);private volatile Message responseMsg = null;private volatile boolean sendRequestOk = true;private volatile Throwable cause = null; }RequestResponseFuture中,利用correlationId來標識一個請求。如下圖所示,Producer發送request時創建一個RequestResponseFuture,以correlationId為key,RequestResponseFuture為value存入map,同時請求中帶上RequestResponseFuture中的correlationId,收到回包后根據correlationId拿到對應的RequestResponseFuture,并設置回包內容。
2.2 consumer消費消息后,如何準確回包
(1)producer在發送消息的時候,會給每條消息生成唯一的標識符,同時還帶上了producer的clientId。當consumer收到并消費消息后,從消息中取出消息的標識符correlationId和producer的標識符clientId,放入響應消息,用來確定此響應消息是哪條請求消息的回包,以及此響應消息應該發給哪個producer。同時響應消息中設置了消息的類型以及響應消息的topic,然后consumer將消息發給broker,如下圖所示。
(2)broker收到響應消息后,需要將消息發回給指定的producer。Broker如何知道發回給哪個producer?因為消息中包含了producer的標識符clientId,在ProducerManager中,維護了標識符和channel信息的對應關系,通過這個對應關系,就能把回包發給對應的producer。
響應消息發送和一般的消息發送流程區別在于,響應消息不需要producer拉取,而是由broker直接推給producer。同時選擇broker的策略也有變化:請求消息從哪個broker發過來,響應消息也發到對應的broker上。
Producer收到響應消息后,根據消息中的唯一標識符,從RequestResponseFuture的map中找到對應的RequestResponseFuture結構,設置響應消息,同時計數器減一,解除等待狀態,使請求方收到響應消息。
3 使用方法
同步調用的示例在example文件夾的rpc目錄下。
3.1 Producer
Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);調用接口替換為request即可。
3.2 Consumer
需要啟動一個producer,同時在覆寫consumeMessage方法的時候,自定義響應消息并發送。
@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes();// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 3000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}4 接口參數
4.1 public Message request(Message msg,long timeout)
msg:待發送的消息
timeout:同步調用超時時間
4.2 public void request(Message msg, final RequestCallback requestCallback, long timeout)
msg:待發送的消息
requestCallback:回調函數
timeout:同步調用超時時間
4.3 public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout)
msg:待發送的消息
selector:消息隊列選擇器
arg:消息隊列選擇器需要的參數
timeout:同步調用超時時間
4.4 public void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback, final long timeout)
msg:待發送的消息
selector:消息隊列選擇器
arg:消息隊列選擇器需要的參數
requestCallback:回調函數
timeout:同步調用超時時間
4.5 public Message request(final Message msg, final MessageQueue mq, final long timeout)
msg:待發送的消息
mq:目標消息隊列
timeout:同步調用超時時間
4.6 public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
msg:待發送的消息
mq:目標消息隊列
requestCallback:回調函數
timeout:同步調用超時時間
總結
以上是生活随笔為你收集整理的快速学习-RocketMQ-“Request-Reply”特性的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: phpmailer的使用方法
- 下一篇: Qt QNetworkAccessMan