activemq部署安装
一、架構(gòu)和技術(shù)介紹
1、簡介
ActiveMQ?是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。完全支持JMS1.1和J2EE 1.4規(guī)范的?JMS Provider實現(xiàn)
2、activemq的特性
1.?多種語言和協(xié)議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應(yīng)用協(xié)議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.?完全支持JMS1.1和J2EE 1.4規(guī)范?(持久化,XA消息,事務(wù))
3.?對Spring的支持,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去,而且也支持Spring2.0的特性
4.?通過了常見J2EE服務(wù)器(如?Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resourceadaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE1.4商業(yè)服務(wù)器上
5.?支持多種傳送協(xié)議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6.?支持通過JDBC和journal提供高速的消息持久化
7.?從設(shè)計上保證了高性能的集群,客戶端-服務(wù)器,點對點
8.?支持Ajax
9.?支持與Axis的整合
10.?可以很容易得調(diào)用內(nèi)嵌JMS provider,進(jìn)行測試
3、下載和安裝ActiveMQ
1、下載
ActiveMQ的最新版本是5.10.0,但由于我們內(nèi)網(wǎng)下載存在問題,所以目前通過內(nèi)網(wǎng)只能下載到5.9.0,下載地址:http://activemq.apache.org/activemq-590-release.html。
2、安裝
?????????如果是在windows系統(tǒng)中運行,可以直接解壓apache-activemq-5.9.0-bin.zip,并運行bin目錄下的activemq.bat文件,此時使用的是默認(rèn)的服務(wù)端口:61616和默認(rèn)的console端口:8161。
?????????如果是在linux或unix下運行,在bin目錄下執(zhí)行命令:./activemq setup
3、修改ActiveMQ的服務(wù)端口和console端口
???????? A、修改服務(wù)端口:打開conf/activemq.xml文件,修改以下紅色字體部分
????????<transportConnectors>
???????????<transportConnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/>
???????</transportConnectors>
?
B、修改console的地址和端口:打開conf/jetty.xml文件,修改以下紅色字體部分
????<bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">
???????<property name="port" value="8162"/>
?</bean>
4、通過客戶端代碼試用ActiveMQ
??????? 需要提前將activemq解壓包中的lib目錄下的相關(guān)包引入到工程中,再進(jìn)行如下編碼:
1、發(fā)送端的代碼:
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.DeliveryMode;
importjavax.jms.Destination;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclass?Sender {
????privatestaticfinalintSEND_NUMBER?= 5;
?
????publicstaticvoid?main(String[] args) {
????????// ConnectionFactory:連接工廠,JMS用它創(chuàng)建連接
????????ConnectionFactory connectionFactory;
????????// Connection:JMS客戶端到JMS Provider的連接
????????Connection connection =?null;
????????// Session:一個發(fā)送或接收消息的線程
????????Session session;
????????// Destination:消息的目的地;消息發(fā)送給誰.
????????Destination destination;
????????// MessageProducer:消息發(fā)送者
????????MessageProducer producer;
????????// TextMessage message;
????????//構(gòu)造ConnectionFactory實例對象,此處采用ActiveMq的實現(xiàn)jar
????????connectionFactory =?new?ActiveMQConnectionFactory(
????????????????ActiveMQConnection.DEFAULT_USER,
????????????????ActiveMQConnection.DEFAULT_PASSWORD,
????????????????"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");
????????try?{
????????????//構(gòu)造從工廠得到連接對象
????????????connection =connectionFactory.createConnection();
????????????//啟動
????????????connection.start();
????????????//獲取操作連接
????????????session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
????????????//獲取session
????????????destination = session.createQueue("FirstQueue");
????????????//得到消息生成者【發(fā)送者】
????????????producer =session.createProducer(destination);
????????????//設(shè)置不持久化,此處學(xué)習(xí),實際根據(jù)項目決定
???????????producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
????????????//構(gòu)造消息,此處寫死,項目就是參數(shù),或者方法獲取
????????????sendMessage(session, producer);
????????????session.commit();
????????}?catch?(Exception e) {
????????????e.printStackTrace();
????????}?finally?{
????????????try?{
????????????????if?(null?!= connection)
????????????????????connection.close();
????????????}?catch?(Throwable ignore) {
????????????}
????????}
????????}
???
????publicstaticvoid?sendMessage(Session session,MessageProducer producer)
????????????throws?Exception {
????????for?(int?i = 1; i <=SEND_NUMBER; i++) {
????????????TextMessage message = session
????????????????????.createTextMessage("ActiveMq發(fā)送的消息"?+ i);
????????????//發(fā)送消息到目的地方
????????????System.out.println("發(fā)送消息:"?+?"ActiveMq?發(fā)送的消息"?+ i);
????????????producer.send(message);
????????}
????}
}
?
2、接收端代碼:
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.MessageConsumer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
?
publicclass?Receive {
????publicstaticvoid?main(String[] args) {
????????// ConnectionFactory:連接工廠,JMS用它創(chuàng)建連接
????????ConnectionFactory connectionFactory;
????????// Connection:JMS客戶端到JMS Provider的連接
????????Connection connection =?null;
????????// Session:一個發(fā)送或接收消息的線程
????????Session session;
????????// Destination:消息的目的地;消息發(fā)送給誰.
????????Destination destination;
????????//消費者,消息接收者
????????MessageConsumer consumer;
????????connectionFactory =?new?ActiveMQConnectionFactory(
????????????????ActiveMQConnection.DEFAULT_USER,
????????????????ActiveMQConnection.DEFAULT_PASSWORD,
????????????????"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");
????????try?{
????????????//構(gòu)造從工廠得到連接對象
????????????connection =connectionFactory.createConnection();
????????????//啟動
????????????connection.start();
????????????//獲取操作連接
????????????session = connection.createSession(false,
????????????????????Session.AUTO_ACKNOWLEDGE);
????????????//獲取session
????????????destination = session.createQueue("FirstQueue");
????????????consumer =session.createConsumer(destination);
????????????while?(true) {
????????????????//設(shè)置接收者接收消息的時間,為了便于測試,這里誰定為100s
????????????????TextMessage message =(TextMessage) consumer.receive(100000);
????????????????if?(null?!= message) {
????????????????????System.out.println("收到消息"?+ message.getText());
????????????????}?else?{
????????????????????break;
????????????????}
????????????}
????????}?catch?(Exception e) {
????????????e.printStackTrace();
????????}?finally?{
????????????try?{
????????????????if?(null?!= connection)
????????????????????connection.close();
????????????}?catch?(Throwable ignore) {
????????????}
????????}
????}
}
3、通過監(jiān)控查看消息堆棧的記錄:
??????登陸http://localhost:8162/admin/queues.jsp,默認(rèn)的用戶名和密碼:admin/admin
二、ActiveMQ的多種部署方式
?????????單點的ActiveMQ作為企業(yè)應(yīng)用無法滿足高可用和集群的需求,所以ActiveMQ提供了master-slave、broker cluster等多種部署方式,但通過分析多種部署方式之后我認(rèn)為需要將兩種部署方式相結(jié)合才能滿足我們公司分布式和高可用的需求,所以后面就重點將解如何將兩種部署方式相結(jié)合。
1、Master-Slave部署方式
1)shared filesystem Master-Slave部署方式
?????????主要是通過共享存儲目錄來實現(xiàn)master和slave的熱備,所有的ActiveMQ應(yīng)用都在不斷地獲取共享目錄的控制權(quán),哪個應(yīng)用搶到了控制權(quán),它就成為master。
?????????多個共享存儲目錄的應(yīng)用,誰先啟動,誰就可以最早取得共享目錄的控制權(quán)成為master,其他的應(yīng)用就只能作為slave。
2)shared database Master-Slave方式
?????????與shared filesystem方式類似,只是共享的存儲介質(zhì)由文件系統(tǒng)改成了數(shù)據(jù)庫而已。
3)Replicated LevelDB Store方式
?????????這種主備方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協(xié)調(diào)選擇一個node作為master。被選擇的master broker node開啟并接受客戶端連接。
其他node轉(zhuǎn)入slave模式,連接master并同步他們的存儲狀態(tài)。slave不接受客戶端連接。所有的存儲操作都將被復(fù)制到連接至Master的slaves。
如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網(wǎng)絡(luò)中并連接master進(jìn)入slave mode。所有需要同步的disk的消息操作都將等待存儲狀態(tài)被復(fù)制到其他法定節(jié)點的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master將會存儲并更新然后等待?(2-1)=1個slave存儲和更新完成,才匯報success。至于為什么是2-1,熟悉Zookeeper的應(yīng)該知道,有一個node要作為觀擦者存在。
單一個新的master被選中,你需要至少保障一個法定node在線以能夠找到擁有最新狀態(tài)的node。這個node將會成為新的master。因此,推薦運行至少3個replica nodes,以防止一個node失敗了,服務(wù)中斷。
2、Broker-Cluster部署方式
?????????前面的Master-Slave的方式雖然能解決多服務(wù)熱備的高可用問題,但無法解決負(fù)載均衡和分布式的問題。Broker-Cluster的部署方式就可以解決負(fù)載均衡的問題。
???????? Broker-Cluster部署方式中,各個broker通過網(wǎng)絡(luò)互相連接,并共享queue。當(dāng)broker-A上面指定的queue-A中接收到一個message處于pending狀態(tài),而此時沒有consumer連接broker-A時。如果cluster中的broker-B上面由一個consumer在消費queue-A的消息,那么broker-B會先通過內(nèi)部網(wǎng)絡(luò)獲取到broker-A上面的message,并通知自己的consumer來消費。
1)static Broker-Cluster部署
?????????在activemq.xml文件中靜態(tài)指定Broker需要建立橋連接的其他Broker:
1、??首先在Broker-A節(jié)點中添加networkConnector節(jié)點:
<networkConnectors>?
??????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>
</networkConnectors>
2、??修改Broker-A節(jié)點中的服務(wù)提供端口為61616:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3、??在Broker-B節(jié)點中添加networkConnector節(jié)點:
<networkConnectors>?
? ????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4、??修改Broker-A節(jié)點中的服務(wù)提供端口為61617:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5、分別啟動Broker-A和Broker-B。
2)Dynamic Broker-Cluster部署
?????????在activemq.xml文件中不直接指定Broker需要建立橋連接的其他Broker,由activemq在啟動后動態(tài)查找:
1、??首先在Broker-A節(jié)點中添加networkConnector節(jié)點:
<networkConnectors>?
??????????????? <networkConnectoruri="multicast://default"
???????????dynamicOnly="true"
???????????networkTTL="3"
???????????prefetchSize="1"
???????????decreaseNetworkConsumerPriority="true" />
</networkConnectors>
2、修改Broker-A節(jié)點中的服務(wù)提供端口為61616:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>
</transportConnectors>
3、在Broker-B節(jié)點中添加networkConnector節(jié)點:
<networkConnectors>?
??????????????? <networkConnectoruri="multicast://default"
???????????dynamicOnly="true"
???????????networkTTL="3"
???????????prefetchSize="1"
???????????decreaseNetworkConsumerPriority="true" />
</networkConnectors>
4、修改Broker-B節(jié)點中的服務(wù)提供端口為61617:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>
</transportConnectors>
5、啟動Broker-A和Broker-B
2、Master-Slave與Broker-Cluster相結(jié)合的部署方式
?????????可以看到Master-Slave的部署方式雖然解決了高可用的問題,但不支持負(fù)載均衡,Broker-Cluster解決了負(fù)載均衡,但當(dāng)其中一個Broker突然宕掉的話,那么存在于該Broker上處于Pending狀態(tài)的message將會丟失,無法達(dá)到高可用的目的。
?????????由于目前ActiveMQ官網(wǎng)上并沒有一個明確的將兩種部署方式相結(jié)合的部署方案,所以我嘗試者把兩者結(jié)合起來部署:
?????????
1、部署的配置修改
?????????這里以Broker-A + Broker-B建立cluster,Broker-C作為Broker-B的slave為例:
1)首先在Broker-A節(jié)點中添加networkConnector節(jié)點:
<networkConnectors>?
??????????????? <networkConnector?? uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>
</networkConnectors>
2)修改Broker-A節(jié)點中的服務(wù)提供端口為61616:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3)在Broker-B節(jié)點中添加networkConnector節(jié)點:
<networkConnectors>?
??????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4)修改Broker-B節(jié)點中的服務(wù)提供端口為61617:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5)修改Broker-B節(jié)點中的持久化方式:
??????<persistenceAdapter>
???????????<kahaDB directory="/localhost/kahadb"/>
????? ??</persistenceAdapter>
6)在Broker-C節(jié)點中添加networkConnector節(jié)點:
<networkConnectors>?
??????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
7)修改Broker-C節(jié)點中的服務(wù)提供端口為61618:
<transportConnectors>
?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
8)修改Broker-B節(jié)點中的持久化方式:
??????<persistenceAdapter>
???????????<kahaDB directory="/localhost/kahadb"/>
???????</persistenceAdapter>
9)分別啟動broker-A、broker-B、broker-C,因為是broker-B先啟動,所以“/localhost/kahadb”目錄被lock住,broker-C將一直處于掛起狀態(tài),當(dāng)人為停掉broker-B之后,broker-C將獲取目錄“/localhost/kahadb”的控制權(quán),重新與broker-A組成cluster提供服務(wù)。
總結(jié)
以上是生活随笔為你收集整理的activemq部署安装的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: node.js安装部署测试
- 下一篇: nginx 并发过十万