RocketMQ从入门到放弃
前言
RocketMQ是一個輕量級的數據處理平臺,本篇主要講解RocketMQ的安裝、配置、基本使用等。
環境
以下是各個版本的RocketMQ對JDK版本的要求,本篇使用的是RocketMQ 4.6.1
(圖片來源:http://rocketmq.apache.org/dowloading/releases/)
概念
角色
RocketMQ中相關的角色大致可以分為四個:NameServer、Broker、Producer和Consumer。
broker
broker是整個RocketMQ的核心
- Broker面向producer和consumer發送和接收消息
- 向Name Server提交自己的信息
- 是RocketMQ的存儲轉發服務器
- 每個Broker節點在啟動時都會遍歷Name Server列表,與每個Name Server建立長連接,注冊自己的信息,然后定時上報
如果broker搭建了集群
- Broker高可用,可以配置成Master/Slave結構,Master可讀可寫,Slave只讀,Master將寫入的數據同步給Slave
- 一個Master可以對應多個Slave,一個Slave只對應一個Master
- Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave
- Master多機負載,可以部署多個broker
- 每個broker與nameserver集群中所有節點建立長連接,定時注冊Topic信息到所有NameServer
producer
- 消息生產者
- 通過集群中的一個節點(隨機選擇)建立長連接,獲得Topic的路由信息,包括Topic下面有哪些Queue,這些Queue分布在哪些Broker上
- 接下來向提供Topic服務的Master建立長連接,且定時向Master發送心跳
consumer
- 消息的消費者
- 通過Name Server集群獲得Topic的路由信息,連接到對應的Broker上進行消費
- Master和Slave都可以讀取消息,因此consumer會與Master和Slave都建立連接
Name Server
- 底層由Netty實現,提供了路由管理,服務注冊,服務發現的功能,是一個無狀態節點
- Name Server是服務發現者,集群中的各個角色(Broker、Producer、Consumer)都需要定時上報自己的狀態,以便互相發現彼此,超時不上報的話,Name Server會把它從列表中剔除
- Name Server可以部署多個,部署多個時,其他角色同時向多個Name Server上報自己的狀態,以保證高可用
- Name Server集群間互不通信,沒有主備概念
- Name Server采用內存式存儲,Broker、Topic等信息默認不會持久化
安裝
## 下載 wget https://archive.apache.org/dist/rocketmq/4.6.1/rocketmq-all-4.6.1-bin-release.zip## 解壓 unzip rocketmq-all-4.6.1-bin-release.zip進入解壓后的RocketMQ的bin目錄
[root@localhost rocketmq-all-4.6.1-bin-release]# cd bin/ [root@localhost bin]# ll 總用量 108 -rwxr-xr-x 1 root root 1654 11月 28 2019 cachedog.sh -rwxr-xr-x 1 root root 845 11月 28 2019 cleancache.sh -rwxr-xr-x 1 root root 1116 11月 28 2019 cleancache.v1.sh drwxr-xr-x 2 root root 25 11月 28 2019 dledger -rwxr-xr-x 1 root root 1398 11月 28 2019 mqadmin -rwxr-xr-x 1 root root 1029 11月 28 2019 mqadmin.cmd -rwxr-xr-x 1 root root 1394 11月 28 2019 mqbroker -rwxr-xr-x 1 root root 1084 11月 28 2019 mqbroker.cmd -rwxr-xr-x 1 root root 1373 11月 28 2019 mqbroker.numanode0 -rwxr-xr-x 1 root root 1373 11月 28 2019 mqbroker.numanode1 -rwxr-xr-x 1 root root 1373 11月 28 2019 mqbroker.numanode2 -rwxr-xr-x 1 root root 1373 11月 28 2019 mqbroker.numanode3 -rwxr-xr-x 1 root root 1396 11月 28 2019 mqnamesrv -rwxr-xr-x 1 root root 1088 11月 28 2019 mqnamesrv.cmd -rwxr-xr-x 1 root root 1571 11月 28 2019 mqshutdown -rwxr-xr-x 1 root root 1398 11月 28 2019 mqshutdown.cmd -rwxr-xr-x 1 root root 2222 11月 28 2019 os.sh -rwxr-xr-x 1 root root 1148 11月 28 2019 play.cmd -rwxr-xr-x 1 root root 1008 11月 28 2019 play.sh -rwxr-xr-x 1 root root 772 11月 28 2019 README.md -rwxr-xr-x 1 root root 2206 11月 28 2019 runbroker.cmd -rwxr-xr-x 1 root root 3713 12月 24 2019 runbroker.sh -rwxr-xr-x 1 root root 1816 11月 28 2019 runserver.cmd -rwxr-xr-x 1 root root 3397 12月 24 2019 runserver.sh -rwxr-xr-x 1 root root 1156 11月 28 2019 setcache.sh -rwxr-xr-x 1 root root 1408 11月 28 2019 startfsrv.sh -rwxr-xr-x 1 root root 1634 12月 24 2019 tools.cmd -rwxr-xr-x 1 root root 1901 12月 24 2019 tools.sh需要啟動的是mqbroker和mqnamesrv兩個腳本。
首先啟動mqnamesrv,可以看到如下日志
[root@localhost bin]# ./mqnamesrv Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000006c0000000, 2147483648, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 2147483648 bytes for committing reserved memory. # An error report file with more information is saved as: # /root/rocketmq-all-4.6.1-bin-release/bin/hs_err_pid2154.log服務啟動失敗,根據日志可以看到,第一行是說ParNew + CMS收集器的方式會在未來的版本中被移除,第二行日志是說CMS收集器的參數UseCMSCompactAtFullCollection會在未來的版本中被移除。
但是這兩行日志都不是服務啟動失敗的原因,失敗的原因在第三行error='Cannot allocate memory' (errno=12),是說無法分配內存,根據日志前半段可以知道,啟動這個腳本需要分配2147483648byte(2G)的內存,由于本篇使用的虛擬機只有1G內存,所以啟動失敗。
因此需要修改腳本啟動參數,查看腳本mqnamesrv
if [ -z "$ROCKETMQ_HOME" ] ; then## resolve links - $0 may be a link to maven's homePRG="$0"# need this for relative symlinkswhile [ -h "$PRG" ] ; dols=`ls -ld "$PRG"`link=`expr "$ls" : '.*-> \(.*\)$'`if expr "$link" : '/.*' > /dev/null; thenPRG="$link"elsePRG="`dirname "$PRG"`/$link"fidonesaveddir=`pwd`ROCKETMQ_HOME=`dirname "$PRG"`/..# make it fully qualifiedROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`cd "$saveddir" fiexport ROCKETMQ_HOMEsh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@可以看到腳本并未設置內存參數,但是最后一行可以看到,改腳本又執行了另一個腳本runserver.sh,所以應該編輯runserver.sh腳本,打開runserver.sh腳本后,可以看到如下參數
把相關的參數改成合適的即可。
再次啟動mqnamesrv可以看到如下日志
[root@localhost bin]# ./mqnamesrv Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. The Name Server boot success. serializeType=JSONName Server已經啟動
同樣的,啟動mqbroker也需要按照上述方式修改對應的參數,成功啟動之后會出現如下日志
[root@localhost bin]# ./mqbroker -n 192.168.1.101:9876 autoCreateTopicEnable=true The broker[localhost.localdomain, 192.168.1.101:10911] boot success. serializeType=JSON and name server is 192.168.1.101:9876-n指定連接哪個Name Server,autoCreateTopicEnable=true指定當Topic不存在時,自動創建。
測試
同步消息
這種方式發送的消息是可靠的消息,消息發送中進入同步等待狀態,可以保證消息投遞一定到達
Producer代碼
/*** @author sicimike* @create 2020-07-27 22:36*/ public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer);// 設置Name Serverproducer.setNamesrvAddr("192.168.1.101:9876");// 啟動Producerproducer.start();// 消息對象Message message = new Message("hello-topic", "Hello RocketMQ".getBytes());// 發送同步消息SendResult sendResult = producer.send(message);System.out.println("sendResult is : " + sendResult);// 關閉連接producer.shutdown();} }Consumer代碼
/*** @author sicimike* @create 2020-07-27 22:37*/ public class SicimikeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello-group-consumer");// 設置Name Serverconsumer.setNamesrvAddr("192.168.1.101:9876");// 訂閱topic,第二個參數是消息過濾器,“*”表示不過濾consumer.subscribe("hello-topic", "*");// 注冊消息監聽器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {// 一次性可能收到多條消息System.out.println("message is : " + new String(messageExt.getBody()));}// 返回消息已經被消費return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 啟動Consumerconsumer.start();} }批量消息
RocketMQ可以將多條消息打包一起發送,減少網絡傳輸的次數,以提高效率。
Producer代碼
public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();Message message1 = new Message("hello-topic", "Hello RocketMQ-1".getBytes());Message message2 = new Message("hello-topic", "Hello RocketMQ-2".getBytes());Message message3 = new Message("hello-topic", "Hello RocketMQ-3".getBytes());List<Message> messageList = new ArrayList<>();messageList.add(message1);messageList.add(message2);messageList.add(message3);SendResult sendResult = producer.send(messageList);System.out.println("sendResult is : " + sendResult);producer.shutdown();} }發送批量消息有一定的限制
- 批量消息要求必要具有同一topic、相同消息配置
- 不支持延時消息
- 建議一個批量消息最好不要超過1MB大小
- 如果不確定是否超過限制,可以手動計算大小分批發送
異步消息
異步消息通常用于響應時間敏感的業務場景中。
public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();// 異步消息發送失敗后重試幾次(不包括第一次發送的那次)producer.setRetryTimesWhenSendAsyncFailed(0);Message message = new Message("hello-topic", "Hello RocketMQ asynchronous".getBytes());producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 發送成功后回調System.out.println("sendResult is : " + sendResult);// 關閉連接producer.shutdown();}@Overridepublic void onException(Throwable e) {// 拋出異常后回調System.out.println("Exception : " + e);}});} }單向消息
單向消息用于要求中等可靠性的情況,例如日志收集。因為單向消息不確保消息能夠發送成功,所以速度較快
public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();Message message = new Message("hello-topic", "Hello RocketMQ One way".getBytes());// 發送單向消息producer.sendOneway(message);} }廣播消息
與上面不同的是,是否是廣播消息是由消費者來決定的
- 集群消費:Consumer集群中,同一個Group下,只有一個Consumer能夠消費
- 廣播消費:訂閱了這個Topic的所有Consumer都能消費
事務消息
事務消息可以將其視為兩階段提交消息實現,以確保分布式系統中的最終一致性。事務性消息可確保本地事務的執行和消息的發送可以原子方式執行。
如圖所示,是RocketMQ發送事務消息時的流程,RocketMQ首先會發送Half Message到Broker的Half Queue,這種消息是不能被消費的,發送成功后,再執行本地事務,如果本地事務正常提交,RockerMQ才會把Half Message轉換成正常的Message消息發送出去,如果本地事務回滾,則Half Message會被刪除。如果在超時時間之內,RocketMQ既沒有收到回滾,也沒有收到提交,則會定時檢查本地事務的狀態,來確定消息是應該被提交還是回滾。
順序消息
RocketMQ使用FIFO順序提供有序消息,順序消息分為全局有序和分區有序。
順序消息分為發送的時候有序和消費的時候有序,只有保證了這兩個都有序,才是說是順序消息。
因為RocketMQ中,每個Topic下面都有若干個Queue,而Queue是FIIFO的,所以想要消息的發送有序,只需要把消息發送到同一個Queue即可,RocketMQ提供了這樣的API
Producer代碼
public class SicimikeProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();for (int i = 0; i < 10; i++) {// 同一個Topic下Message message = new Message("hello-topic", ("Hello RocketMQ Order " + i).getBytes());// MessageQueueSelector是Queue的選擇器SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;// 根據外部參數,選擇合適的Queuereturn mqs.get(id % mqs.size());}}, 1234);System.out.println("sendResult is : " + sendResult);}} }想要實現消費者的順序消費非常簡單,RocketMQ也提供了這樣的API。
Consumer代碼
public class SicimikeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello-group-consumer");consumer.setNamesrvAddr("192.168.1.101:9876");consumer.subscribe("hello-topic", "*");// MessageListenerOrderly表示順序消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.println(Thread.currentThread().getName() + " -> " + new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();} }Consumer執行結果
ConsumeMessageThread_1 -> Hello RocketMQ Order 0 ConsumeMessageThread_1 -> Hello RocketMQ Order 1 ConsumeMessageThread_1 -> Hello RocketMQ Order 2 ConsumeMessageThread_1 -> Hello RocketMQ Order 3 ConsumeMessageThread_1 -> Hello RocketMQ Order 4 ConsumeMessageThread_1 -> Hello RocketMQ Order 5 ConsumeMessageThread_1 -> Hello RocketMQ Order 6 ConsumeMessageThread_1 -> Hello RocketMQ Order 7 ConsumeMessageThread_1 -> Hello RocketMQ Order 8 ConsumeMessageThread_1 -> Hello RocketMQ Order 9MessageListenerOrderly會給每個Queue啟動一個線程進行消費,所以對于一個Queue中的消息是有序的。
以下實例可以驗證,在Producer中開啟兩個線程進行寫入,分別把消息寫到不同的Queue。
public class SicimikeProducer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("hello-group-producer");producer.setNamesrvAddr("192.168.1.101:9876");producer.start();new Thread(() -> {try {for (int i = 0; i < 10; i++) {Message message = new Message("hello-topic", ("Hello RocketMQ Order 1234-" + i).getBytes());SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;return mqs.get(id % mqs.size());}}, 1234);System.out.println(Thread.currentThread().getName() + " -> sendResult is : " + sendResult);}} catch (Exception e) {}}).start();new Thread(() -> {try {for (int i = 0; i < 10; i++) {Message message = new Message("hello-topic", ("Hello RocketMQ Order 4321-" + i).getBytes());SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;return mqs.get(id % mqs.size());}}, 4321);System.out.println(Thread.currentThread().getName() + " -> sendResult is : " + sendResult);}} catch (Exception e) {}}).start();} }執行結果
Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863AE0001, offsetMsgId=C0A8016500002A9F0000000000013EA1, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=115] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863AE0000, offsetMsgId=C0A8016500002A9F0000000000013DBD, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=18] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863BD0003, offsetMsgId=C0A8016500002A9F0000000000013F85, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=19] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863BD0002, offsetMsgId=C0A8016500002A9F0000000000014069, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=116] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863C60004, offsetMsgId=C0A8016500002A9F000000000001414D, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=117] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863C60005, offsetMsgId=C0A8016500002A9F0000000000014231, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=20] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863CB0006, offsetMsgId=C0A8016500002A9F0000000000014315, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=118] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863CD0007, offsetMsgId=C0A8016500002A9F00000000000143F9, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=21] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D30008, offsetMsgId=C0A8016500002A9F00000000000144DD, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=119] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D50009, offsetMsgId=C0A8016500002A9F00000000000145C1, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=22] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D8000A, offsetMsgId=C0A8016500002A9F00000000000146A5, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=120] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D9000B, offsetMsgId=C0A8016500002A9F0000000000014789, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=23] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E0000C, offsetMsgId=C0A8016500002A9F000000000001486D, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=121] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E6000D, offsetMsgId=C0A8016500002A9F0000000000014951, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=24] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E8000E, offsetMsgId=C0A8016500002A9F0000000000014A35, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=122] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E9000F, offsetMsgId=C0A8016500002A9F0000000000014B19, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=25] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863ED0010, offsetMsgId=C0A8016500002A9F0000000000014BFD, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=123] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F40011, offsetMsgId=C0A8016500002A9F0000000000014CE1, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=26] Thread-6 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F50012, offsetMsgId=C0A8016500002A9F0000000000014DC5, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=2], queueOffset=124] Thread-7 -> sendResult is : SendResult [sendStatus=SEND_OK, msgId=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F80013, offsetMsgId=C0A8016500002A9F0000000000014EA9, messageQueue=MessageQueue [topic=hello-topic, brokerName=localhost.localdomain, queueId=1], queueOffset=27]根據輸出信息就可以看到,每個線程在寫入自己的Queue時都是有序的。再來看看消費者
public class SicimikeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello-group-consumer");consumer.setNamesrvAddr("192.168.1.101:9876");consumer.subscribe("hello-topic", "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.println(Thread.currentThread().getName() + " -> " + messageExt.toString());}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();} }執行結果
ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=115, sysFlag=0, bornTimestamp=1596467602352, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602468, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000013EA1, commitLogOffset=81569, bodyCRC=194455642, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863AE0001, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 48], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=18, sysFlag=0, bornTimestamp=1596467602352, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602467, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000013DBD, commitLogOffset=81341, bodyCRC=1481455063, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863AE0000, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 48], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=19, sysFlag=0, bornTimestamp=1596467602365, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602482, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000013F85, commitLogOffset=81797, bodyCRC=793380161, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863BD0003, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 49], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=116, sysFlag=0, bornTimestamp=1596467602365, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602482, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014069, commitLogOffset=82025, bodyCRC=2089818316, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863BD0002, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 49], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=20, sysFlag=0, bornTimestamp=1596467602374, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602486, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014231, commitLogOffset=82481, bodyCRC=910382331, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863C60005, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 50], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=21, sysFlag=0, bornTimestamp=1596467602381, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602496, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F00000000000143F9, commitLogOffset=82937, bodyCRC=1095001197, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863CD0007, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 51], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=117, sysFlag=0, bornTimestamp=1596467602374, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602486, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F000000000001414D, commitLogOffset=82253, bodyCRC=1704544630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863C60004, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 50], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=22, sysFlag=0, bornTimestamp=1596467602389, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602500, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F00000000000145C1, commitLogOffset=83393, bodyCRC=1595994574, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D50009, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 52], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=118, sysFlag=0, bornTimestamp=1596467602379, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602491, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014315, commitLogOffset=82709, bodyCRC=312375776, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863CB0006, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 51], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=23, sysFlag=0, bornTimestamp=1596467602393, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602506, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014789, commitLogOffset=83849, bodyCRC=673694040, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D9000B, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 53], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=119, sysFlag=0, bornTimestamp=1596467602387, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602498, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F00000000000144DD, commitLogOffset=83165, bodyCRC=217771075, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D30008, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 52], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=24, sysFlag=0, bornTimestamp=1596467602406, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602517, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014951, commitLogOffset=84305, bodyCRC=825135330, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E6000D, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 54], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=120, sysFlag=0, bornTimestamp=1596467602392, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602506, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F00000000000146A5, commitLogOffset=83621, bodyCRC=2080234709, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863D8000A, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 53], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=25, sysFlag=0, bornTimestamp=1596467602409, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602524, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014B19, commitLogOffset=84761, bodyCRC=1177133172, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E9000F, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 55], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=121, sysFlag=0, bornTimestamp=1596467602400, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602514, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F000000000001486D, commitLogOffset=84077, bodyCRC=1660194159, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E0000C, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 54], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=26, sysFlag=0, bornTimestamp=1596467602420, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602531, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014CE1, commitLogOffset=85217, bodyCRC=1452719589, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F40011, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 56], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=122, sysFlag=0, bornTimestamp=1596467602408, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602519, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014A35, commitLogOffset=84533, bodyCRC=368295417, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863E8000E, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 55], transactionId='null'}] ConsumeMessageThread_1 -> MessageExt [queueId=1, storeSize=228, queueOffset=27, sysFlag=0, bornTimestamp=1596467602424, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602535, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014EA9, commitLogOffset=85673, bodyCRC=563187059, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F80013, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 52, 51, 50, 49, 45, 57], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=123, sysFlag=0, bornTimestamp=1596467602413, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602525, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014BFD, commitLogOffset=84989, bodyCRC=88907880, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863ED0010, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 56], transactionId='null'}] ConsumeMessageThread_2 -> MessageExt [queueId=2, storeSize=228, queueOffset=124, sysFlag=0, bornTimestamp=1596467602421, bornHost=/192.168.1.33:9717, storeTimestamp=1596467602532, storeHost=/192.168.1.101:10911, msgId=C0A8016500002A9F0000000000014DC5, commitLogOffset=85445, bodyCRC=1917554942, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='hello-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=125, UNIQ_KEY=24098A4C0A222600A1B5C95D6F74641E000018B4AAC20F4863F50012, CLUSTER=DefaultCluster, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 79, 114, 100, 101, 114, 32, 49, 50, 51, 52, 45, 57], transactionId='null'}]可以看到對于每一個Queue,都是是有序的。
總結
本篇主要講解RocketMQ的安裝、基本概念、基本使用方式。
總結
以上是生活随笔為你收集整理的RocketMQ从入门到放弃的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 单片机 spwm c语言程序,基于STC
- 下一篇: 如何订立合规的电子劳动合同 ——关于人社