RocketMQ單機(jī)支持1萬以上的持久化隊(duì)列,前提是足夠的內(nèi)存、硬盤空間,過期數(shù)據(jù)數(shù)據(jù)刪除(RocketMQ中的消息隊(duì)列長度不是無限的,只是足夠大的內(nèi)存+數(shù)據(jù)定時(shí)刪除)
RocketMQ版本:3.1.4
?
一,部署NameServer:
1,安裝JDK并設(shè)置JAVA_HOME環(huán)境變量(啟動(dòng)腳本依賴JAVA_HOME環(huán)境變量)
2,cd /alibaba-rocketmq/bin進(jìn)入RocketMQ的bin目錄
2,調(diào)用nohup sh mqnamesrv &啟動(dòng)NameServer
報(bào)錯(cuò)如下:
[plain]?view plain
?copy ? :?command?not?found?? :?command?not?found?? mqnamesrv:?line?35:?syntax?error:?unexpected?end?of?file??
在bin目錄下調(diào)用dos2unix *將所有文件轉(zhuǎn)化為unix格式,再次調(diào)用nohup sh mqnamesrv &
報(bào)錯(cuò)如下:
[plain]?view plain
?copy ? /home/hadoop/alibaba-rocketmq?? Invalid?initial?heap?size:?-Xms4g?? The?specified?size?exceeds?the?maximum?representable?size.?? Could?not?create?the?Java?virtual?machine.??
由于安裝的JDK版本為32位,4g超過了JDK所支持的最大內(nèi)存,不過32位JDK也無法發(fā)揮出RocketMQ的優(yōu)勢,換成64位JDK
這次啟動(dòng)成功
[plain]?view plain
?copy ? [hadoop@hadoop?bin]$?nohup?sh?mqnamesrv?&?? [1]?17676?? [hadoop@hadoop?bin]$?nohup:?appending?output?to?“nohup.out”?? ?? [hadoop@hadoop?bin]$?cat?nohup.out??? The?Name?Server?boot?success.?? [hadoop@hadoop?bin]$?jps?? 17682?NamesrvStartup?? 17800?Jps??
NameServer監(jiān)聽端口:9876
[java]?view plain
?copy ? nettyServerConfig.setListenPort(9876);??
如果服務(wù)器內(nèi)存不夠,可以修改runserver.sh腳本(mqnamesrv文件中通過runserver.sh腳本調(diào)用Name Server的主函數(shù)com.alibaba.rocketmq.namesrv.NamesrvStartup啟動(dòng)Name Server)中的JAVA_OPT_1參數(shù)
[plain]?view plain
?copy ? JAVA_OPT_1="-server?-Xms4g?-Xmx4g?-Xmn2g?-XX:PermSize=128m?-XX:MaxPermSize=320m"??
二,部署B(yǎng)roker:消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息
Broker集群有多種配置方式:
1,單Master
??? 優(yōu)點(diǎn):除了配置簡單沒什么優(yōu)點(diǎn)
??? 缺點(diǎn):不可靠,該機(jī)器重啟或宕機(jī),將導(dǎo)致整個(gè)服務(wù)不可用
2,多Master
??? 優(yōu)點(diǎn):配置簡單,性能最高
??? 缺點(diǎn):可能會(huì)有少量消息丟失(配置相關(guān)),單臺機(jī)器重啟或宕機(jī)期間,該機(jī)器下未被消費(fèi)的消息在機(jī)器恢復(fù)前不可訂閱,影響消息實(shí)時(shí)性
3,多Master多Slave,每個(gè)Master配一個(gè)Slave,有多對Master-Slave,集群采用異步復(fù)制方式,主備有短暫消息延遲,毫秒級
??? 優(yōu)點(diǎn):性能同多Master幾乎一樣,實(shí)時(shí)性高,主備間切換對應(yīng)用透明,不需人工干預(yù)
??? 缺點(diǎn):Master宕機(jī)或磁盤損壞時(shí)會(huì)有少量消息丟失
4,多Master多Slave,每個(gè)Master配一個(gè)Slave,有多對Master-Slave,集群采用同步雙寫方式,主備都寫成功,向應(yīng)用返回成功
??? 優(yōu)點(diǎn):服務(wù)可用性與數(shù)據(jù)可用性非常高
??? 缺點(diǎn):性能比異步集群略低,當(dāng)前版本主宕備不能自動(dòng)切換為主
Master和Slave的配置文件參考conf目錄下的配置文件
Master與Slave通過指定相同的brokerName參數(shù)來配對,Master的BrokerId必須是0,Slave的BrokerId必須是大于0的數(shù)
一個(gè)Master下面可以掛載多個(gè)Slave,同一Master下的多個(gè)Slave通過指定不同的BrokerId來區(qū)分
部署一Master一Slave,集群采用異步復(fù)制方式:
Master:
[plain]?view plain
?copy ? [hadoop@hadoop?bin]$?nohup?sh?mqbroker?-n?"192.168.58.163:9876"?-c?../conf/2m-2s-async/broker-a.properties?&?? [2]?25493?? [hadoop@hadoop?bin]$?nohup:?appending?output?to?“nohup.out”?? ?? [hadoop@hadoop?bin]$?cat?nohup.out??? load?config?properties?file?OK,?../conf/2m-2s-async/broker-a.properties?? The?broker[broker-a,?192.168.58.163:10911]?boot?success.?and?name?server?is?192.168.58.163:9876?? [hadoop@hadoop?bin]$?jps?? 25500?BrokerStartup?? 25545?Jps?? 17682?NamesrvStartup??
Slave:
[plain]?view plain
?copy ? [hadoop@hadoop?bin]$?nohup?sh?mqbroker?-n?"192.168.58.163:9876"?-c?../conf/2m-2s-async/broker-a-s.properties?&?? [1]?1974?? [hadoop@hadoop?bin]$?nohup:?appending?output?to?“nohup.out”?? ?? [hadoop@hadoop?bin]$?cat?nohup.out??? load?config?properties?file?OK,?../conf/2m-2s-async/broker-a-s.properties?? The?broker[broker-a,?192.168.58.164:10911]?boot?success.?and?name?server?is?192.168.58.163:9876?? [hadoop@hadoop?bin]$?jps?? 2071?Jps?? 1981?BrokerStartup??
Broker監(jiān)聽端口:10911
[java]?view plain
?copy ? nettyServerConfig.setListenPort(10911);??
如果服務(wù)器內(nèi)存不夠,可以修改runbroker.sh腳本(mqbroker文件中通過runbroker.sh腳本調(diào)用Broker的主函數(shù)com.alibaba.rocketmq.broker.BrokerStartup啟動(dòng)Broker)的JAVA_OPT_1參數(shù)
[plain]?view plain
?copy ? JAVA_OPT_1="-server?-Xms4g?-Xmx4g?-Xmn2g?-XX:PermSize=128m?-XX:MaxPermSize=320m"??
三,Producer
必須要設(shè)置Name Server地址
[java]?view plain
?copy ? package?com.sean;?? ?? import?com.alibaba.rocketmq.client.producer.DefaultMQProducer;?? import?com.alibaba.rocketmq.client.producer.SendResult;?? import?com.alibaba.rocketmq.common.message.Message;?? ?? public?class?Producer?{?? ????public?static?void?main(String[]?args){?? ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("Producer");?? ????????producer.setNamesrvAddr("192.168.58.163:9876");??? ????????try?{?? ????????????producer.start();?? ?????????????? ????????????Message?msg?=?new?Message("PushTopic",??? ????????????????????"push",??? ????????????????????"1",??? ????????????????????"Just?for?test.".getBytes());?? ?????????????? ????????????SendResult?result?=?producer.send(msg);?? ????????????System.out.println("id:"?+?result.getMsgId()?+?? ????????????????????"?result:"?+?result.getSendStatus());?? ?????????????? ????????????msg?=?new?Message("PushTopic",??? ????????????????????"push",??? ????????????????????"2",??? ????????????????????"Just?for?test.".getBytes());?? ?????????????? ????????????result?=?producer.send(msg);?? ????????????System.out.println("id:"?+?result.getMsgId()?+?? ????????????????????"?result:"?+?result.getSendStatus());?? ?????????????? ????????????msg?=?new?Message("PullTopic",??? ????????????????????"pull",??? ????????????????????"1",??? ????????????????????"Just?for?test.".getBytes());?? ?????????????? ????????????result?=?producer.send(msg);?? ????????????System.out.println("id:"?+?result.getMsgId()?+?? ????????????????????"?result:"?+?result.getSendStatus());?? ????????}?catch?(Exception?e)?{?? ????????????e.printStackTrace();?? ????????}finally{?? ????????????producer.shutdown();?? ????????}?? ????}?? }??
四,Consumer
必須要設(shè)置Name Server地址
[java]?view plain
?copy ? package?com.sean;?? ?? import?java.util.List;?? ?? import?com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;?? import?com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;?? import?com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;?? import?com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;?? import?com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;?? import?com.alibaba.rocketmq.common.message.Message;?? import?com.alibaba.rocketmq.common.message.MessageExt;?? ?? public?class?Consumer?{?? ????public?static?void?main(String[]?args){?? ????????DefaultMQPushConsumer?consumer?=??? ????????????????new?DefaultMQPushConsumer("PushConsumer");?? ????????consumer.setNamesrvAddr("192.168.58.163:9876");??? ????????try?{?? ?????????????? ????????????consumer.subscribe("PushTopic",?"push");?? ?????????????? ????????????consumer.setConsumeFromWhere(?? ????????????????????ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);?? ????????????consumer.registerMessageListener(?? ????????????????new?MessageListenerConcurrently()?{?? ????????????????????public?ConsumeConcurrentlyStatus?consumeMessage(?? ????????????????????????????List<MessageExt>?list,?? ????????????????????????????ConsumeConcurrentlyContext?Context)?{?? ????????????????????????Message?msg?=?list.get(0);?? ????????????????????????System.out.println(msg.toString());?? ????????????????????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;?? ????????????????????}?? ????????????????}?? ????????????);?? ????????????consumer.start();?? ????????}?catch?(Exception?e)?{?? ????????????e.printStackTrace();?? ????????}?? ????}?? }??
先運(yùn)行Consumer,然后運(yùn)行Producer
Producer運(yùn)行結(jié)果:
[plain]?view plain
?copy ? id:C0A83AA300002A9F00000000000009EA?result:SEND_OK?? id:C0A83AA300002A9F0000000000000A77?result:SEND_OK?? id:C0A83AA300002A9F0000000000000B04?result:SEND_OK??
Consumer運(yùn)行結(jié)果:
MessageExt [queueId=1, storeSize=141, queueOffset=6, sysFlag=0, bornTimestamp=1403765668792, bornHost=/192.168.31.130:60985, storeTimestamp=1403765527374, storeHost=/192.168.58.163:10911, msgId=C0A83AA300002A9F0000000000000A77, commitLogOffset=2679, bodyCRC=753746584, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={TAGS=push, KEYS=2, WAIT=true, MAX_OFFSET=7, MIN_OFFSET=0}, body=14]]
MessageExt [queueId=0, storeSize=141, queueOffset=6, sysFlag=0, bornTimestamp=1403765668698, bornHost=/192.168.31.130:60985, storeTimestamp=1403765527356, storeHost=/192.168.58.163:10911, msgId=C0A83AA300002A9F00000000000009EA, commitLogOffset=2538, bodyCRC=753746584, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={TAGS=push, KEYS=1, WAIT=true, MAX_OFFSET=7, MIN_OFFSET=0}, body=14]]
總結(jié)
以上是生活随笔為你收集整理的阿里RocketMQ Quick Start的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。