日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)

發布時間:2024/9/19 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

有序消息

消息有序指的是可以按照消息的發送順序來消費。
RocketMQ可以嚴格的保證消息有序。但這個順序,不是全局順序,只是分區(queue)順序。

順序消息生產者

public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("localhost:9876");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest2", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 發送消息時,需要實現MessageQueueSelector , 用來選擇合適的queueSendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;// int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}

上面實現的順序消息時,通過orderId來進行順序消息,同一個訂單ID的消息,發送到同一個Queue里面

順序消息消費者

public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");// 設置NameServer地址consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest1", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}

需要注意,registerMessageListener 注冊的消息監聽器 , 需要使用MessageListenerOrderly , ConsumeOrderlyContext , 不可以使用

MessageListenerConcurrently , ConsumeConcurrentlyContext , 否則消費的順序無法保證。

源碼分析

/*** @param msg 消息* @param selector 消息隊列選擇器* @param arg 分片值 (類似分庫分表里面的分片鍵)*/ @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg, selector, arg); }

實際發送

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);}private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);// 1. 獲取topic信息,TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {// 2. 獲取當前topic的內部隊列信息List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());// 復制一個消息Message userMessage = MessageAccessor.cloneMessage(msg);// topic信息String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);//3. 獲取消息隊列mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue throwed exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 獲取到隊列了,執行發送消息, 跟普通消息的發送一樣的return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}

步驟說明:

  • 獲取當前topic的信息,內部包含消息隊列
  • 獲取topic內部的隊列信息
  • 獲取消息隊列,這個其實就是順序消息實現的核心點selector.select(messageQueueList, userMessage, arg) ,通過自定義的消息隊列選擇器,返回相應的隊列。 內部完全自定義
  • 獲取到了消息隊列之后,執行發送消息,跟普通消息一樣,這里就沒有什么重試的說法了。
  • 總結: 順序消息的核心就是將你希望按照順序的消息,通過某種特定的條件,計算發送到對應的隊列里面去。

    順序消息的缺點:

  • 送順序消息無法利用集群的Failover特性,因為不能更換broker,MessageQueue進行重試
  • 存在隊列熱點問題,當一個場景下消息非常多的情況,會導致個別隊列非常繁忙
  • 消費失敗時無法跳過, 會導致消費停止
  • 消息的并行度依賴于對列數量,不過可以增加隊列數量,動態調整
  • 思考: 通過上面那種順序消息的模式,在broker發生宕機 , 隊列數量發生變化時,會造成消費亂序

    比如在多master集群的情況下 ,

    topic: TP_TEST 總共8個隊列MASTER-1 : 1,2,3,4 MASTER-2 : 5,6,7,8

    一個topic分別在多個master上面有隊列, 如果其中一個master宕機了,那么隊列數會變成4個,那么順序消息通過 orderId % queueSize 的這種方式,會造成原來往一個隊列里面發送的,會發送到另外一個隊列里面去,造成消費亂序。

    所以如果是要嚴格的順序消息,則不要使用rocketMq, 在極端情況下會造成消費亂序。

    http://weixin.qq.com/r/eC-YwJDE7s2RrdSj93pq (二維碼自動識別)

    總結

    以上是生活随笔為你收集整理的producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。