RocketMQ 是什么 Github 上關于 RocketMQ 的介紹:RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。具有以下特性:
支持發(fā)布/訂閱(Pub/Sub)和點對點(P2P)消息模型 在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞 支持拉(pull)和推(push)兩種消息模式 單一隊列百萬消息的堆積能力 支持多種消息協(xié)議,如 JMS、MQTT 等 分布式高可用的部署架構,滿足至少一次消息傳遞語義 提供 docker 鏡像用于隔離測試和云集群部署 提供配置、指標和監(jiān)控等功能豐富的 Dashboard 對于這些特性描述,大家簡單過一眼就即可,深入學習之后自然就明白了。
專業(yè)術語 Producer 消息生產(chǎn)者,生產(chǎn)者的作用就是將消息發(fā)送到 MQ,生產(chǎn)者本身既可以產(chǎn)生消息,如讀取文本信息等。也可以對外提供接口,由外部應用來調用接口,再由生產(chǎn)者將收到的消息發(fā)送到 MQ。
Producer Group 生產(chǎn)者組,簡單來說就是多個發(fā)送同一類消息的生產(chǎn)者稱之為一個生產(chǎn)者組。在這里可以不用關心,只要知道有這么一個概念即可。
Consumer 消息消費者,簡單來說,消費 MQ 上的消息的應用程序就是消費者,至于消息是否進行邏輯處理,還是直接存儲到數(shù)據(jù)庫等取決于業(yè)務需要。
Consumer Group 消費者組,和生產(chǎn)者類似,消費同一類消息的多個 consumer 實例組成一個消費者組。
Topic Topic 是一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那么就需要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。
Message Message 是消息的載體。一個 Message 必須指定 topic,相當于寄信的地址。Message 還有一個可選的 tag 設置,以便消費端可以基于 tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業(yè)務 key 來查找 broker 上的消息,方便在開發(fā)過程中診斷問題。
Tag 標簽可以被認為是對 Topic 進一步細化。一般在相同業(yè)務模塊中通過引入標簽來標記不同用途的消息。
Broker Broker 是 RocketMQ 系統(tǒng)的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生產(chǎn)者的消息,儲存以及為消費者拉取消息的請求做好準備。
Name Server Name Server 為 producer 和 consumer 提供路由信息。
RocketMQ 架構 RocketMQ 架構
由這張圖可以看到有四個集群,分別是 NameServer 集群、Broker 集群、Producer 集群和 Consumer 集群:
NameServer: 提供輕量級的服務發(fā)現(xiàn)和路由。 每個 NameServer 記錄完整的路由信息,提供等效的讀寫服務,并支持快速存儲擴展。 Broker: 通過提供輕量級的 Topic 和 Queue 機制來處理消息存儲,同時支持推(push)和拉(pull)模式以及主從結構的容錯機制。 Producer:生產(chǎn)者,產(chǎn)生消息的實例,擁有相同 Producer Group 的 Producer 組成一個集群。 Consumer:消費者,接收消息進行消費的實例,擁有相同 Consumer Group 的Consumer 組成一個集群。 簡單說明一下圖中箭頭含義,從 Broker 開始,Broker Master1 和 Broker Slave1 是主從結構,它們之間會進行數(shù)據(jù)同步,即 Date Sync。同時每個 Broker 與NameServer 集群中的所有節(jié)點建立長連接,定時注冊 Topic 信息到所有 NameServer 中。
Producer 與 NameServer 集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從 NameServer 獲取 Topic 路由信息,并向提供 Topic 服務的 Broker Master 建立長連接,且定時向 Broker 發(fā)送心跳。Producer 只能將消息發(fā)送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務的 Master 和 Slave建立長連接,既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。
RocketMQ 集群部署模式 單 master 模式也就是只有一個 master 節(jié)點,稱不上是集群,一旦這個 master 節(jié)點宕機,那么整個服務就不可用,適合個人學習使用。 多 master 模式多個 master 節(jié)點組成集群,單個 master 節(jié)點宕機或者重啟對應用沒有影響。優(yōu)點:所有模式中性能最高缺點:單個 master 節(jié)點宕機期間,未被消費的消息在節(jié)點恢復之前不可用,消息的實時性就受到影響。注意 :使用同步刷盤可以保證消息不丟失,同時 Topic 相對應的 queue 應該分布在集群中各個節(jié)點,而不是只在某各節(jié)點上,否則,該節(jié)點宕機會對訂閱該 topic 的應用造成影響。 多 master 多 slave 異步復制模式在多 master 模式的基礎上,每個 master 節(jié)點都有至少一個對應的 slave。master節(jié)點可讀可寫,但是 slave 只能讀不能寫,類似于 mysql 的主備模式。優(yōu)點: 在 master 宕機時,消費者可以從 slave 讀取消息,消息的實時性不會受影響,性能幾乎和多 master 一樣。缺點:使用異步復制的同步方式有可能會有消息丟失的問題。 多 master 多 slave 同步雙寫模式同多 master 多 slave 異步復制模式類似,區(qū)別在于 master 和 slave 之間的數(shù)據(jù)同步方式。優(yōu)點:同步雙寫的同步模式能保證數(shù)據(jù)不丟失。缺點:發(fā)送單個消息 RT 會略長,性能相比異步復制低10%左右。刷盤策略:同步刷盤和異步刷盤(指的是節(jié)點自身數(shù)據(jù)是同步還是異步存儲)同步方式:同步雙寫和異步復制(指的一組 master 和 slave 之間數(shù)據(jù)的同步)注意 :要保證數(shù)據(jù)可靠,需采用同步刷盤和同步雙寫的方式,但性能會較其他方式低。 RocketMQ 單主部署 鑒于是快速入門,我選擇的是第一種單 master 的部署模式。先說明一下我的安裝環(huán)境:
Centos 7.2 jdk 1.8 Maven 3.2.x Git 這里 git 可用可不用,主要是用來直接下載 github 上的源碼。也可以選擇自己到github 上下載,然后上傳到服務器上。以git操作為示例。
clone 源碼并用 maven 編譯 > git clone https://github.com/alibaba/RocketMQ.git /opt/RocketMQ
> cd /opt/RocketMQ && mvn -Dmaven.test.skip=true clean package install assembly:assembly -U
> cd target/alibaba-rocketmq-broker/alibaba-rocketmq
此處可能遇到的問題 一、執(zhí)行"git clone https://github.com/alibaba/RocketMQ.git /home/inspkgs/RocketMQ"時出現(xiàn)以下提示:
fatal: unable to access 'https://github.com/alibaba/RocketMQ.git/': Could not resolve host: github.com; Unknown error
解決辦法:一般是由于網(wǎng)絡原因造成的,執(zhí)行以下命令
> ping github.com
確定可以 ping 通之后,再重新執(zhí)行 git clone 命令。 二、執(zhí)行"mvn -Dmaven.test.skip=true clean package install assembly:assembly -U"編譯時,可能出現(xiàn)下載相關jar很慢的情況。 這也是由于默認 maven 中央倉庫在國外的原因,可以根據(jù)需要在 /home/maven/conf/setting.xml 中的 <mirrors></mirrors> 添加以下內(nèi)容后重新編譯:
<mirror><id>aliyun</id><mirrorOf>central</mirrorOf><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
啟動 Name Server > nohup sh /opt/RocketMQ/bin/mqnamesrv &
//執(zhí)行 jps 查看進程
> jps
25913 NamesrvStartup
//查看日志確保服務已正常啟動
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
啟動 broker > nohup sh /opt/RocketMQ/bin/mqbroker -n localhost:9876 &
//執(zhí)行 jps 查看進程
> jps
25954 BrokerStartup
//查看日志確保服務已正常啟動
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a, 10.1.54.121:10911] boot success...
發(fā)送和接收消息 發(fā)送/接收消息之前,我們需要告訴客戶端 NameServer 地址。RocketMQ 提供了多種方式來實現(xiàn)這一目標。為簡單起見,我們使用環(huán)境變量 NAMESRV_ADDR。 > export NAMESRV_ADDR=localhost:9876
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
關閉服務 > sh /opt/RocketMQ/bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh /opt/RocketMQ/bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
生產(chǎn)者、消費者 Demo 生產(chǎn)者 public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//聲明并初始化一個producer//需要一個producer group名字作為構造方法的參數(shù),這里為producer1DefaultMQProducer producer = new DefaultMQProducer("producer1");//設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔//NameServer的地址必須有,但是也可以通過環(huán)境變量的方式設置,不一定非得寫死在代碼里producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//調用start()方法啟動一個producer實例producer.start();//發(fā)送10條消息到Topic為TopicTest,tag為TagA,消息內(nèi)容為“Hello RocketMQ”拼接上i的值for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTest",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body);//調用producer的send()方法發(fā)送消息//這里調用的是同步的方式,所以會有返回結果SendResult sendResult = producer.send(msg);//打印返回結果,可以看到消息發(fā)送的狀態(tài)以及一些相關信息System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//發(fā)送完消息之后,調用shutdown()方法關閉producerproducer.shutdown();}
}
消費者 public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//聲明并初始化一個consumer//需要一個consumer group名字作為構造方法的參數(shù),這里為consumer1DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");//同樣也要設置NameServer地址consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//這里設置的是一個consumer的消費策略//CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息//CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//設置consumer所訂閱的Topic和Tag,*代表全部的Tagconsumer.subscribe("TopicTest", "*");//設置一個Listener,主要進行消息的邏輯處理consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);//返回消費狀態(tài)//CONSUME_SUCCESS 消費成功//RECONSUME_LATER 消費失敗,需要稍后重新消費return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//調用start()方法啟動consumerconsumer.start();System.out.println("Consumer Started.");}
}
作者:馮先生的筆記
鏈接:http://www.jianshu.com/p/824066d70da8
來源:簡書
著作權歸作者所有。商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。作者:馮先生的筆記鏈接:http://www.jianshu.com/p/824066d70da8來源:簡書著作權歸作者所有。商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。
總結
以上是生活随笔 為你收集整理的RocketMQ特性、专业术语(Producer,Producer Group,Consumer Group,Topic,Message,Tag,Broker,Name Server)等 的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔 推薦給好友。