日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ Filtersrv

發(fā)布時間:2024/4/13 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ Filtersrv 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RocketMQ Filtersrv詳解

RocketMQ入門手冊

?

import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException, IOException {String group_name = "filter_consumer";DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);consumer.setNamesrvAddr("localhost:9876");String filterCode = MixAll.file2String("C:\\JavaEE_Workspace\\RocketMQ\\src\\main\\java\\com\\aztech\\filter\\MessageFilterImpl.java"); // System.out.println("filterCode: " + filterCode);/*** 使用Java代碼,在服務器做消息過濾*/consumer.subscribe("TopicFilter7", MessageFilterImpl.class.getCanonicalName()); // consumer.subscribe("TopicFilter7", "com.aztech.filter.MessageFilterImpl",filterCode);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {//System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);MessageExt me = msgs.get(0);try {System.out.println("收到信息:" + new String(me.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});/*** Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>*/consumer.start();System.out.println("Consumer Started.");} } import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {String group_name = "filter_producer";DefaultMQProducer producer = new DefaultMQProducer(group_name);producer.setNamesrvAddr("localhost:9876");producer.start();try {for (int i = 0; i < 10; i++) {Message msg = new Message("TopicFilter7",// topic"TagA",// tag"OrderID001",// key("Hello RocketMQ" + i).getBytes());// bodymsg.putUserProperty("SequenceId", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.println(sendResult);}}catch (Exception e) {e.printStackTrace();}producer.shutdown();} } import org.apache.rocketmq.common.filter.FilterContext; import org.apache.rocketmq.common.filter.MessageFilter; import org.apache.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg, FilterContext arg1) {// 盡量遵循規(guī)范,使用getUserPropertyString property = msg.getUserProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);//if ((id % 3) == 0 && (id > 10)) {if ((id % 2) == 0) {return true;}}return false;}}

?

總結

以上是生活随笔為你收集整理的RocketMQ Filtersrv的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。