RocketMq案例,生产者,消费者,消息订阅
1、RocketMq集群配置參考:
http://blog.csdn.net/tototuzuoquan/article/details/78314572
使用的rocketmq的ip等是上面博文提及的ip等內容
2、創建RocketMq工程
工程目錄結構如下:
3、編寫pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.toto.rocketmq</groupId><artifactId>rocketmq-demo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-store</artifactId><version>4.0.0-incubating</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.0.0-incubating</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.4</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build> </project>4、編寫demo.one下的Producer,代碼如下:
package demo.one;import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.UUID;public class Producer {public static void main(String[] args) {//生成ProducerDefaultMQProducer producer = new DefaultMQProducer("pro_qch_test");//配置Producerproducer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");producer.setInstanceName(UUID.randomUUID().toString());//啟動Producertry{producer.start();}catch(MQClientException e) {e.printStackTrace();return;}//生產消息String str = "Hello RocketMQ!------" + UUID.randomUUID().toString();Message msg = new Message("qch_20170706",str.getBytes());try{producer.send(msg);} catch(MQClientException | RemotingException | MQBrokerException | InterruptedException e){e.printStackTrace();return;}//停止Producerproducer.shutdown();System.out.println("[-----------]Success\n");} }運行結果如下:
5、編寫demo.one下的ProducerTest
package demo.one;import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.UUID;public class ProducerTest {private static DefaultMQProducer producer = null;public static void main(String[] args) {System.out.print("[----------]Start\n");int pro_count = 1;if (args.length > 0) {pro_count = Integer.parseInt(args[0]);}boolean result = false;try {ProducerStart();for (int i = 0; i < pro_count; i++) {String msg = "hello rocketmq "+ i + "----" + UUID.randomUUID().toString();SendMessage("qch_20170706", msg);System.out.print(msg + "\n");}}finally {producer.shutdown();}System.out.print("[----------]Succeed\n");}private static boolean ProducerStart() {producer = new DefaultMQProducer("pro_qch_test");producer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");producer.setInstanceName(UUID.randomUUID().toString());try {producer.start();} catch(MQClientException e) {e.printStackTrace();return false;}return true;}private static boolean SendMessage(String topic, String str) {Message msg = new Message(topic, str.getBytes());try {producer.send(msg);} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();return false;}return true;} }通過這個類實現批量發送消息
在此類中args參數需要輸入,接下來idea中模擬輸入參數:
運行結果:
5、編寫demo.one下的ConsumerTest
package demo.one;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;import java.util.List; import java.util.UUID;public class ConsumerTest {public static void main(String[] args) {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test");consumer.setInstanceName(UUID.randomUUID().toString());consumer.setConsumeMessageBatchMaxSize(32);consumer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt me : list) {System.out.print(new String(me.getBody()) + "\n");}System.out.println("=====================================");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});try {consumer.subscribe("qch_20170706", "*");consumer.start();} catch (Exception e) {e.printStackTrace();}} }運行結果:
6、編寫demo.one下的PullConsumer
在rocketmq里,consumer被分為2類:MQPullConsumer和MQPushConsumer,其實本質都是拉模式(pull),即consumer輪詢從broker拉取消息。
區別是:
push方式里,consumer把輪詢過程封裝了,并注冊MessageListener監聽器,取到消息后,喚醒MessageListener的consumerMessage()來消費,對用戶而言,感覺消息是被推送過來的。
pull方式里,取消息的過程需要用戶自己寫,首先通過打算消費的Topic拿到MessageQueue的集合,遍歷MessageQueue集合,然后針對每個MessageQueue批量取消息,一次取完后,記錄該隊列下一次要取的開始offset,直到取完了,再換另一個MessageQueue.
package demo.one;import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.UUID;public class PullConsumer {private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<>();public static void main(String[] args) {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumerGroup");consumer.setInstanceName(UUID.randomUUID().toString());consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");try {consumer.start();} catch (MQClientException e) {e.printStackTrace();}try {Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("qch_20170706");for(MessageQueue mq : mqs) {System.out.println("Consume from the queue: " + mq + "%n");SINGLE_MQ:while(true) {try {PullResult pullResult = consumer.pullBlockIfNotFound(mq,null,getMessageQueueOffset(mq),32);System.out.printf("%s%n", pullResult);putMessageQueueOffset(mq,pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}} catch (Exception e) {e.printStackTrace();}}}} catch (MQClientException e) {e.printStackTrace();}}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = OFFSE_TABLE.get(mq);if (offset != null)return offset;return 0;}private static void putMessageQueueOffset(MessageQueue mq, long offset) {OFFSE_TABLE.put(mq, offset);} }運行后的結果是:
7、編寫demo.one下的PushConsumer
package demo.one;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.UUID;public class PushConsumer {public static void main(String[] args) throws MQClientException, InterruptedException {//生成ConsumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_group_1");//配置Consumerconsumer.setInstanceName(UUID.randomUUID().toString());consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setConsumeMessageBatchMaxSize(32);consumer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener((MessageListenerConcurrently)(list, consumeConcurrentlyContext) -> {//消費消息for(MessageExt me : list) {System.out.print("msg=" + new String(me.getBody()) + "\n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });//啟動Consumerconsumer.subscribe("qch_20170706", "*");consumer.start();//停止ConsumerThread.sleep(60000);consumer.shutdown();} }運行結果:
8、編寫 demo.one下的PushConsumerTest
package demo.one;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List; import java.util.UUID;public class PushConsumerTest {private static int count = 0;public static void main(String[] args) {System.out.print("Push Consumer main start!\n");count = 0;DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_group_1");consumer.setInstanceName(UUID.randomUUID().toString());consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setConsumeMessageBatchMaxSize(32);consumer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.print("list count=" + list.size() + "\n");for(MessageExt me : list) {count ++;System.out.print("count=" + count + ", msg=" + new String(me.getBody()) + "\n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});try {consumer.subscribe("qch_20170706", "*");consumer.start();System.out.print("Push Consumer Started!\n");} catch (Exception e) {e.printStackTrace();}} }運行結果:
9、編寫PushConsumerThreadPollTest
package demo.one;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;public class PushConsumerThreadPollTest {public static void main(String[] args){int threadCount = 3;int waitTime = 60000;ExecutorService executor = Executors.newFixedThreadPool(threadCount);for(int i = 0; i < threadCount; i++) {Runnable runner = new ExecutorThread(String.valueOf(i));executor.execute(runner);}try {Thread.sleep(60000);executor.shutdown();executor.awaitTermination(waitTime, TimeUnit.MICROSECONDS);} catch (InterruptedException e) {}}}class ExecutorThread implements Runnable {private String name = "";private int count = 0;ExecutorThread(String name) {this.name = name;}@Overridepublic void run() {StartPushConsumer();}private void StartPushConsumer() {System.out.print("Consumer Name=" + name + "\n");count = 0;DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_group_1");consumer.setInstanceName(UUID.randomUUID().toString());//廣播消費//consumer.setMessageModel(MessageModel.BROADCASTING);//集群消費consumer.setMessageModel(MessageModel.CLUSTERING);consumer.setConsumeMessageBatchMaxSize(32);consumer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {System.out.print("list count=" + list.size() + "\n");for(MessageExt me : list) {count ++;System.out.print("name=" + name + ", count=" + count + ", msg=" + new String(me.getBody()) + "\n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});try {consumer.subscribe("qch_20170706", "*");consumer.start();System.out.print("Consumer started. name=" + name + "\n");} catch (Exception e) {e.printStackTrace();}} }運行的結果:
10、編寫demo.two中producer
package demo.two;import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException {/*** 一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例* 注意:ProducerGroupName需要由應用來保證唯一* ProducerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,* 因為服務器會回查這個Group下的任意一個Producer*/DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");/*** Producer對象在使用之前必須要調用start初始化,初始化一次即可* 注意:切記不可以在每次發送消息時,都調用start方法。*/producer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");producer.start();/*** 下面這段代碼表明一個Producer對象可以發送多個topic,多個tag的消息。* 注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,* 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬于成功,但是對于個別應用如果對消息可靠性要求極高,* 需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。*/try {{Message msg = new Message("TopicTest1",// topic"TagA",// tag"OrderID001",// key("Hello MetaQ").getBytes());// bodySendResult sendResult = producer.send(msg);System.out.println(sendResult);}{Message msg = new Message("TopicTest2",// topic"TagB",// tag"OrderID0034",// key("Hello MetaQ").getBytes());// bodySendResult sendResult = producer.send(msg);System.out.println(sendResult);}{Message msg = new Message("TopicTest3",// topic"TagC",// tag"OrderID061",// key("Hello MetaQ").getBytes());// bodySendResult sendResult = producer.send(msg);System.out.println(sendResult);}} catch (Exception e){e.printStackTrace();}/*** 應用退出時,要調用shutdown來清理資源,關閉網絡連接,從MetaQ服務器上注銷自己* 注意:我們建議應用在JBoss、Tomcat等容器的退出鉤子里調用shutdown方法。*/producer.shutdown();} }運行結果:
11、編寫demo.two下的consumer
package demo.two;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {/*** 當前粒子是PushConsumer用法,使用方式給用戶感覺消息從RocketMQ服務器推到了應用客戶端。* 但是實際PushConsumer內部是使用長輪詢Pull方法從Broker拉消息,然后再回調用戶Listener方法。* @param args*/public static void main(String[] args) throws MQClientException {/*** 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例* 注意:ConsumerGroupName需要由應用來保證唯一*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");consumer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");/*** 訂閱指定topic下tags分別等于TagA或tagC或TagD*/consumer.subscribe("TopicTest1", "TagA || TagC || TagD");/*** 訂閱指定topic下所有消息<br>* 注意:一個consumer對象可以訂閱多個topic*/consumer.subscribe("TopicTest2", "*");consumer.subscribe("TopicTest3","*");/*** 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費* 如果非第一次啟動,那么按照上次消費的位置繼續消費*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages:" + msgs);MessageExt msg = msgs.get(0);System.out.println(new String(msg.getBody()));if (msg.getTopic().equals("TopicTest1")) {//執行TopicTest1的消費邏輯if (msg.getTopic() != null && msg.getTags().equals("TagA")) {//執行TagA的消費System.out.println("執行TagA的消費");} else if(msg.getTags() != null && msg.getTags().equals("TagC")) {//執行TagC的消費System.out.println("執行TagC的消費");} else if (msg.getTags() != null && msg.getTags().equals("TagD")) {//執行TagD的消費System.out.println("執行TagD的消費");}} else if(msg.getTopic().equals("TopicTest2")) {//執行TopicTest2的消費邏輯System.out.println("執行TopicTest2的消費邏輯");} else if (msg.getTopic().equals("TopicTest3")){System.out.println("執行TopicTest3的消費邏輯");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});/*** Consumer對象在使用之前必須調用start初始化,初始化一次即可*/consumer.start();System.out.println("Consumer Started.");} }運行結果:
12、編寫demo.three中的producer
package demo.three;import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;import java.util.Date;public class Producer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");/*** 默認情況下,一臺服務器只能啟動一個Producer或Consumer實例,所以如果需要在* 一臺服務器啟動多個實例,需要設置實例的名稱。*/producer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000); //每秒發送一次MQMessage msg = new Message("TopicA-test", //topic"TagA", //tag(new Date() + "Hello RocketMQ,QuickStart" + i).getBytes());SendResult sendResult = producer.send(msg);System.out.println(sendResult);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}}運行結果:
13、編寫demo.three中的consumer
package demo.three;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("192.168.106.101:9876;192.168.106.102:9876");consumer.setInstanceName("consumer");consumer.subscribe("TopicA-test", "TagA");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");} }運行后的代碼結果是:
總結
以上是生活随笔為你收集整理的RocketMq案例,生产者,消费者,消息订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: rocketmq集群安装部署过程(4.0
- 下一篇: RocketMQ特性、专业术语(Prod