志宇-RocketMQ学习
RocketMQ
- RocketMQ安裝
- RocketMQ-console安裝
- RocketMQ簡單使用
- RabbitMQ核心概念
- 消息發(fā)送狀態(tài)(返回對象中的枚舉類型有4種)
- 重試次數(shù)
- RocketMQ發(fā)送消息三種方式
- RocketMQ延遲消息設(shè)置
- 消息順序消費
- 消費端配置參數(shù)詳情
- tag標(biāo)簽
- 消費模式
- Offset和CommitLog
- 事務(wù)使用
- 集群部署
RocketMQ安裝
官方安裝方法
先后安裝Name Server(起到路由功能)和 Broker(RocketMQ服務(wù)) 然后測試下發(fā)送和接收可用
內(nèi)存不夠在/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/bin目錄下修改runbroker.sh tools.sh runserver.sh這三個文件中JAVA_OPT參數(shù),修改完如下
#僅僅修改已經(jīng)存在的配置即可,將4g換成256m或者128m #runbroker.sh broker占用的內(nèi)存大小 #tools.sh 測試發(fā)送和接收工具的內(nèi)存大小 #runserver.sh 路由的內(nèi)存和大小 JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"RocketMQ-console安裝
找到rocketmq-console/src/main/resources/application.properties 修改配置
#console端口 server.port=17890 #name server地址 rocketmq.config.namesrvAddr=localhost:9876找到rocketmq-console目錄下的pom.xml文件,修改配置
<rocketmq.version>4.4.0XXX</rocketmq.version> 4.4.0XXX 修改為 4.4.0(你的RocketMQ版本) <rocketmq.version>4.4.0</rocketmq.version>進(jìn)入rocketmq-console目錄,編譯打包 mvn clean package -Dmaven.test.skip=true
進(jìn)入target目錄 ,啟動 java -jar rocketmq-console-ng-1.0.0.jar
守護(hù)進(jìn)程方式啟動 nohup java -jar rocketmq-console-ng-1.0.0.jar &
RocketMQ簡單使用
常見錯誤一
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout解決辦法
因為服務(wù)器中可能有多塊網(wǎng)卡,rocketmq要指定公網(wǎng)ip
在配置文件rocketmq-all-4.8.0-source-release/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf中添加
brokerIP1=39.107.109.94此ip為公網(wǎng)ip
然后重新啟動 ,啟動命令如下:
jps 找到啟動進(jìn)程,kill -9 關(guān)閉進(jìn)程,然后已守護(hù)進(jìn)程啟動
nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
常見錯誤二
MQClientException: No route info of this topic, TopicTest1解決辦法
來到/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0目錄下
查看已經(jīng)配置信息sh bin/mqbroker -m 配置信息如下
修改配置文件distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf中autoCreateTopicEnable=true,true 則代表可以自動創(chuàng)建topic,生產(chǎn)上要關(guān)閉,如果這個參數(shù)是true還創(chuàng)建不了topic可能是因為程序引入jar包的版本和RabbitMQ版本不同導(dǎo)致的
常用配置說明
compressMsgBodyOverHowmuch :消息超過默認(rèn)字節(jié)4096后進(jìn)行壓縮 retryTimesWhenSendFailed : 失敗重發(fā)次數(shù) maxMessageSize : 最大消息配置,默認(rèn)128k topicQueueNums : 主題下面的隊列數(shù)量,默認(rèn)是4 autoCreateTopicEnable : 是否自動創(chuàng)建主題Topic, 開發(fā)建議為true,生產(chǎn)要為false defaultTopicQueueNums : 自動創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊列數(shù) autoCreateSubscriptionGroup: 是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開發(fā)開啟,線上關(guān)閉 brokerClusterName : 集群名稱 brokerId : 0表示Master主節(jié)點 大于0表示從節(jié)點 brokerIP1 : Broker服務(wù)地址 brokerRole : broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE deleteWhen : 每天執(zhí)行刪除過期文件的時間,默認(rèn)每天凌晨4點 flushDiskType :刷盤策略, 默認(rèn)為 ASYNC_FLUSH(異步刷盤), 另外是SYNC_FLUSH(同步刷盤) listenPort : Broker監(jiān)聽的端口號 mapedFileSizeCommitLog : 單個conmmitlog文件大小,默認(rèn)是1GB mapedFileSizeConsumeQueue:ConsumeQueue每個文件默認(rèn)存30W條,可以根據(jù)項目調(diào)整 storePathRootDir : 存儲消息以及一些配置信息的根目錄 默認(rèn)為用戶的 ${HOME}/store storePathCommitLog:commitlog存儲目錄默認(rèn)為${storePathRootDir}/commitlog storePathIndex: 消息索引存儲路徑 syncFlushTimeout : 同步刷盤超時時間 diskMaxUsedSpaceRatio : 檢測可用的磁盤空間大小,超過后會寫入報錯(磁盤沒有滿卻寫入不進(jìn)RabbitMQ是就有可能是因為這個參數(shù),這個參數(shù)就是當(dāng)磁盤還剩余百分之多少時就不允許在寫入消息了)常見錯誤三
控制臺查看不了數(shù)據(jù),提示連接 10909錯誤,因為RocketMQ自帶VIP虛擬ip技術(shù),這時要防火墻要開放10909端口,才能使用
RabbitMQ核心概念
消息發(fā)送狀態(tài)(返回對象中的枚舉類型有4種)
生產(chǎn)者發(fā)送完信息返回的類型(返回類型SendResult中的SendStatus成員變量)
FLUSH_DISK_TIMEOUT
在指定時間沒有將消息同步到磁盤中(刷盤策略需要為SYNC_FLUSH 才會出這個錯誤)
例如:在cpu爆滿時候?qū)е聸]有刷盤成功
FLUSH_SLAVE_TIMEOUT
主從模式下,broker是SYNC_MASTER, 沒有在規(guī)定時間內(nèi)完成主從同步
例如:網(wǎng)路原因等,導(dǎo)致主從同步?jīng)]有成功,如果是異步復(fù)制則不會出現(xiàn)這個問題
SLAVE_NOT_AVAILABLE
從模式下,broker是SYNC_MASTER, 但是沒有找到被配置成Slave的Broker
例如:主從模式下,所有從節(jié)點宕機,如果是異步復(fù)制則不會出現(xiàn)這個問題
SEND_OK
發(fā)送成功,沒有發(fā)生上面的三種問題
重試次數(shù)
生產(chǎn)者重試:RocketMQ中默認(rèn)次數(shù)是2次,一般不是跨國調(diào)用不用修改重試次數(shù)了
重試次數(shù)設(shè)置方法如下
異步發(fā)送不會重試,需自己書寫代碼重試
//這里會開啟一個線程異步發(fā)送消息 produceProxy.getProducet().send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {//處理消息}@Overridepublic void onException(Throwable e) {//進(jìn)行重試} });消費者重試:默認(rèn)次數(shù)16次,當(dāng)網(wǎng)絡(luò)中斷、消費者報錯、ack確認(rèn)失敗導(dǎo)致重試
手動設(shè)置重試次數(shù)方法如下
RocketMQ發(fā)送消息三種方式
同步發(fā)送: 發(fā)送驗證碼,郵件通知中使用,速快快,當(dāng)前線程反饋,可靠
異步發(fā)送:注冊發(fā)送優(yōu)惠券中使用,速快快,非前線程反饋,可靠
OneWay: 日志采集中使用,速度最快,沒有反饋,相對不可靠
RocketMQ延遲消息設(shè)置
開源版本不支持定時發(fā)送,只支持固定的發(fā)送時間
在源碼中rocketmq-store 項目> MessageStoreConfig.java >的成員變量 messageDelayLevel中有固定的延遲時間
發(fā)送消息時設(shè)定延遲等級
Message message = new Message("topicName","TagA","sendMessage"); //0等級代表不延遲 message.setDelayTimeLevel(0) //1代表延遲一秒鐘 message.setDelayTimeLevel(1) //2代表延遲5秒鐘 //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 以此類推 message.setDelayTimeLevel(2)消息順序消費
全局順序消費:例如比特幣交易過程中人民幣換成美元時候按照從低到高的順序
局部順序消費:例如 訂單狀態(tài)的消息提醒
局部順序的使用說明:
1、RocketMQ上默認(rèn)一個topic上有4個queue,順序消費要將消息投遞到同一個topic對應(yīng)的同一個隊列中(根據(jù)業(yè)務(wù)id取模然后投遞到同一個隊列上,通過MessageQueueSelector實現(xiàn)類實現(xiàn))
2、順序消息暫不支持廣播模式、異步發(fā)送方式
3、順序消費支持多個消費者進(jìn)行消費(消費者消費前會對消費的隊列加鎖)
4、消費者部署的節(jié)點數(shù)要小于此topic對應(yīng)的queue的數(shù)量(消費數(shù)量會均等分給消費者,不然有的消費者收不到消息)
5、消費者單線程處理,使用MessageListenerOrderly替代MessageListenerConcurrently
發(fā)送消息偽代碼如下
接收消息偽代碼如下
//這里使用MessageListenerOrderly替代MessageListenerConcurrently(使用單線程消費) consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {MessageExt messageExt = msgs.get(0);String body = new String(messageExt.getBody(), "utf-8");System.out.println(body);return ConsumeOrderlyStatus.SUCCESS;}} );消費端配置參數(shù)詳情
//設(shè)置組 一個項目中一個組對應(yīng)一個消費者 consumer = new DefaultMQPushConsumer(RocketMQConfig.groupName); //設(shè)置地址 consumer.setNamesrvAddr(RocketMQConfig.serverAddresses); //設(shè)置訂閱對應(yīng)的topic consumer.subscribe(RocketMQConfig.topicName, "*"); //設(shè)置默認(rèn)消費隊列中最后一個,默認(rèn)也是這個配置 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //消費者均等消費策略 ,默認(rèn)也是這個配置 consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()); //設(shè)置存儲在本地還是遠(yuǎn)程,默認(rèn)廣播存儲在消費者本地、其他存儲在遠(yuǎn)程 //consumer.setOffsetStore(new RemoteBrokerOffsetStore()); //設(shè)置消費者最大線程數(shù) consumer.setConsumeThreadMax(100); //設(shè)置消費者最小線程數(shù) consumer.setConsumeThreadMin(5); //消費者一次從mq中拉取多少條數(shù)據(jù) consumer.setPullBatchSize(32); //拉取后每次消費多少條 consumer.setConsumeMessageBatchMaxSize(1); //設(shè)置為廣播模式,如果是廣播模式則重試次數(shù)失效 //consumer.setMessageModel(MessageModel.BROADCASTING); //設(shè)置為集群模式,默認(rèn)是集群模式 consumer.setMessageModel(MessageModel.CLUSTERING);tag標(biāo)簽
消費者可以選擇消費 某個Group 中的某個 topic 中的指定Tag
1、消費者手動過濾Tag (沒有用到的tag也進(jìn)行傳輸浪費資源)
2、RocketMQ選擇發(fā)送給消費者
消費模式
1、Broker獲得消息然后將消息發(fā)送給消費者(消費者壓力大)
2、消費者間隔去向Broker拉取(沒有消息也會去拉取、間隔時間不好設(shè)置)
3、Broker和消費者之間每15秒發(fā)起一次長連接(默認(rèn))
Offset和CommitLog
CommitLog用于存儲發(fā)送的消息內(nèi)容
Offset 用于存儲消息存儲在隊列中的下標(biāo)
CommitLog默認(rèn)存儲位置 根目錄下store/consumequeue/{topicName}/{queueid}/fileName
事務(wù)使用
集群部署
推薦同步雙寫、異步刷盤
總結(jié)
以上是生活随笔為你收集整理的志宇-RocketMQ学习的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: html embed用法 Embed
- 下一篇: 考研数学若干题解析