日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ快速入门之消息过滤器(用户自定义属性)

發布時間:2024/4/13 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ快速入门之消息过滤器(用户自定义属性) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

默認配置下,不支持自定義屬性,需要設置開啟:

#加入到broker的配置文件中 enablePropertyFilter=true package cn.learn.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException; import java.util.List;public class ConsumerFilter {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("learn-consumer");consumer.setNamesrvAddr("localhost:9876");// 訂閱消息,接收的是所有消息 // consumer.subscribe("my-topic", "*");consumer.subscribe("my-topic-filter", MessageSelector.bySql("sex='女' AND age>=18"));consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {try {for (MessageExt msg : msgs) {System.out.println("消息:" + new String(msg.getBody(), "UTF-8"));}} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println("接收到消息 -> " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 啟動消費者consumer.start();} } package cn.learn.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("learn");producer.setNamesrvAddr("localhost:9876");producer.start();//發送消息String msg = "這是一個用戶的消息, id = 1001";Message message = new Message("my-topic-filter", "delete", msg.getBytes("UTF-8"));message.putUserProperty("sex","女");message.putUserProperty("age","18");SendResult sendResult = producer.send(message);System.out.println("消息id:" + sendResult.getMsgId());System.out.println("消息隊列:" + sendResult.getMessageQueue());System.out.println("消息offset值:" + sendResult.getQueueOffset());System.out.println(sendResult);producer.shutdown();} }

?

總結

以上是生活随笔為你收集整理的RocketMQ快速入门之消息过滤器(用户自定义属性)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。