RocketMQ Filtersrv
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ Filtersrv
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
RocketMQ Filtersrv詳解
RocketMQ入門手冊
?
import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException, IOException {String group_name = "filter_consumer";DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);consumer.setNamesrvAddr("localhost:9876");String filterCode = MixAll.file2String("C:\\JavaEE_Workspace\\RocketMQ\\src\\main\\java\\com\\aztech\\filter\\MessageFilterImpl.java"); // System.out.println("filterCode: " + filterCode);/*** 使用Java代碼,在服務器做消息過濾*/consumer.subscribe("TopicFilter7", MessageFilterImpl.class.getCanonicalName()); // consumer.subscribe("TopicFilter7", "com.aztech.filter.MessageFilterImpl",filterCode);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {//System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);MessageExt me = msgs.get(0);try {System.out.println("收到信息:" + new String(me.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});/*** Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>*/consumer.start();System.out.println("Consumer Started.");} } import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {String group_name = "filter_producer";DefaultMQProducer producer = new DefaultMQProducer(group_name);producer.setNamesrvAddr("localhost:9876");producer.start();try {for (int i = 0; i < 10; i++) {Message msg = new Message("TopicFilter7",// topic"TagA",// tag"OrderID001",// key("Hello RocketMQ" + i).getBytes());// bodymsg.putUserProperty("SequenceId", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.println(sendResult);}}catch (Exception e) {e.printStackTrace();}producer.shutdown();} } import org.apache.rocketmq.common.filter.FilterContext; import org.apache.rocketmq.common.filter.MessageFilter; import org.apache.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg, FilterContext arg1) {// 盡量遵循規(guī)范,使用getUserPropertyString property = msg.getUserProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);//if ((id % 3) == 0 && (id > 10)) {if ((id % 2) == 0) {return true;}}return false;}}?
總結
以上是生活随笔為你收集整理的RocketMQ Filtersrv的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ消息发送之pull和pu
- 下一篇: Dubbo-HelloWorld