RocketMQ快速入门之消息过滤器(用户自定义属性)
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ快速入门之消息过滤器(用户自定义属性)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
默認配置下,不支持自定義屬性,需要設置開啟:
#加入到broker的配置文件中 enablePropertyFilter=true package cn.learn.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException; import java.util.List;public class ConsumerFilter {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("learn-consumer");consumer.setNamesrvAddr("localhost:9876");// 訂閱消息,接收的是所有消息 // consumer.subscribe("my-topic", "*");consumer.subscribe("my-topic-filter", MessageSelector.bySql("sex='女' AND age>=18"));consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {try {for (MessageExt msg : msgs) {System.out.println("消息:" + new String(msg.getBody(), "UTF-8"));}} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println("接收到消息 -> " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 啟動消費者consumer.start();} } package cn.learn.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("learn");producer.setNamesrvAddr("localhost:9876");producer.start();//發送消息String msg = "這是一個用戶的消息, id = 1001";Message message = new Message("my-topic-filter", "delete", msg.getBytes("UTF-8"));message.putUserProperty("sex","女");message.putUserProperty("age","18");SendResult sendResult = producer.send(message);System.out.println("消息id:" + sendResult.getMsgId());System.out.println("消息隊列:" + sendResult.getMessageQueue());System.out.println("消息offset值:" + sendResult.getQueueOffset());System.out.println(sendResult);producer.shutdown();} }?
總結
以上是生活随笔為你收集整理的RocketMQ快速入门之消息过滤器(用户自定义属性)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ快速入门之手动创建top
- 下一篇: RocketMQ的Producer详解之