RocketMQ API使用简介、拉取机制
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ API使用简介、拉取机制
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
分布式開放消息系統(RocketMQ)的原理與實踐?
import java.text.SimpleDateFormat; import java.util.Date; import java.util.List;import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {// 聲明并初始化一個producer// 需要一個producer group名字作為構造方法的參數,這里為producer1DefaultMQProducer producer = new DefaultMQProducer("order_producer");producer.setNamesrvAddr("localhost:9876");//producer.createTopic(key, newTopic, queueNum);producer.start();//String[] tags = new String[] {"TagA","TagC","TagD"};Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 1; i <= 5; i++) {try {// 時間戳String body = dateStr + " order_0 " + i;Message msg = new Message("TopicTest",// topic//tags[i%tags.length],// tag"order_0",// tag"KEY"+i,body.getBytes()// body);SendResult sendResult = producer.send(msg,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer)arg; return mqs.get(id);}// 0是隊列的下標},0);System.out.println(sendResult + ", body:" + body);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}for (int i = 1; i <= 5; i++) {try {// 時間戳String body = dateStr + " order_1 " + i;Message msg = new Message("TopicTest",// topic//tags[i%tags.length],// tag"order_1",// tag"KEY"+i,body.getBytes()// body);SendResult sendResult = producer.send(msg,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer)arg; return mqs.get(id);}// 1是隊列的下標},1);System.out.println(sendResult + ", body:" + body);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}for (int i = 1; i <= 5; i++) {try {// 時間戳String body = dateStr + " order_2 " + i;Message msg = new Message("TopicTest",// topic//tags[i%tags.length],// tag"order_2",// tag"KEY"+i,body.getBytes()// body);SendResult sendResult = producer.send(msg,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer)arg; return mqs.get(id);}// 2是隊列的下標},2);System.out.println(sendResult + ", body:" + body);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();} } import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;public class Consumer1 {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer");consumer.setNamesrvAddr("localhost:9876");/*** 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費* 如果非第一次啟動,那么按照上次消費的位置繼續消費*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/**消費線程池最小數量:默認10**/consumer.setConsumeThreadMin(10);/**消費線程池最大數量:默認120**/consumer.setConsumeThreadMax(20);// 訂閱的主題,以及過濾的標簽內容consumer.subscribe("TopicTest", "*");//設置一個Listener,主要進行消息的邏輯處理consumer.registerMessageListener(new MessageListenerOrderly() {private Random random = new Random();@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 設置自動提交context.setAutoCommit(true);for(MessageExt msg: msgs) {System.out.println(msg + ",content:" + new String(msg.getBody()));}try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});//調用start()方法啟動consumerconsumer.start();System.out.println("C1 Started.");} } import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;public class Consumer2 {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer");consumer.setNamesrvAddr("localhost:9876");/*** 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費* 如果非第一次啟動,那么按照上次消費的位置繼續消費*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/**消費線程池最小數量:默認10**/consumer.setConsumeThreadMin(10);/**消費線程池最大數量:默認120**/consumer.setConsumeThreadMax(20);// 訂閱的主題,以及過濾的標簽內容consumer.subscribe("TopicTest", "*");//設置一個Listener,主要進行消息的邏輯處理consumer.registerMessageListener(new MessageListenerOrderly() {private Random random = new Random();@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 設置自動提交context.setAutoCommit(true);for(MessageExt msg: msgs) {System.out.println(msg + ",content:" + new String(msg.getBody()));}try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});//調用start()方法啟動consumerconsumer.start();System.out.println("C2 Started.");} }?
總結
以上是生活随笔為你收集整理的RocketMQ API使用简介、拉取机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ简介、环境搭建
- 下一篇: RocketMQ消息发送之pull和pu