延迟消息
延時消息
比如電商里,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存
啟動消息消費者
package com.leon.mq.rocketmq.delay;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.創建消費者Consumer,制定消費者組名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.訂閱主題Topic和Tagconsumer.subscribe("DelayTopic", "*");//4.設置回調函數,處理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息內容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("消息ID:【" + msg.getMsgId() + "】,延遲時間:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.啟動消費者consumerconsumer.start();System.out.println("消費者啟動");} }發送延時消息
package com.leon.mq.rocketmq.delay;import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {//1.創建消息生產者producer,并制定生產者組名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.啟動producerproducer.start();for (int i = 0; i < 10; i++) {//4.創建消息對象,指定主題Topic、Tag和消息體/*** 參數一:消息主題Topic* 參數二:消息Tag* 參數三:消息內容*/Message msg = new Message("DelayTopic", "Tag1", ("Hello World" + i).getBytes());//設定延遲時間//msg.setDelayTimeLevel(2);//5.發送消息SendResult result = producer.send(msg);//發送狀態SendStatus status = result.getSendStatus();System.out.println("發送結果:" + result);//線程睡1秒TimeUnit.SECONDS.sleep(1);}//6.關閉生產者producerproducer.shutdown();}}驗證
您將會看到消息的消費比存儲時間晚10秒
使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";現在RocketMq并不支持任意時間的延時,需要設置幾個固定的延時等級,從1s到2h分別對應著等級1到18
?
?
總結