日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

activemq部署安装

發(fā)布時間:2023/11/30 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 activemq部署安装 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、架構(gòu)和技術(shù)介紹

1、簡介

ActiveMQ?Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。完全支持JMS1.1J2EE 1.4規(guī)范的?JMS Provider實現(xiàn)

2、activemq的特性

1.?多種語言和協(xié)議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應(yīng)用協(xié)議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2.?完全支持JMS1.1J2EE 1.4規(guī)范?(持久化,XA消息,事務(wù))

3.?Spring的支持,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去,而且也支持Spring2.0的特性

4.?通過了常見J2EE服務(wù)器(?Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resourceadaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE1.4商業(yè)服務(wù)器上

5.?支持多種傳送協(xié)議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6.?支持通過JDBCjournal提供高速的消息持久化

7.?從設(shè)計上保證了高性能的集群,客戶端-服務(wù)器,點對點

8.?支持Ajax

9.?支持與Axis的整合

10.?可以很容易得調(diào)用內(nèi)嵌JMS provider,進(jìn)行測試

3、下載和安裝ActiveMQ

1、下載

ActiveMQ的最新版本是5.10.0,但由于我們內(nèi)網(wǎng)下載存在問題,所以目前通過內(nèi)網(wǎng)只能下載到5.9.0,下載地址:http://activemq.apache.org/activemq-590-release.html

2、安裝

?????????如果是在windows系統(tǒng)中運行,可以直接解壓apache-activemq-5.9.0-bin.zip,并運行bin目錄下的activemq.bat文件,此時使用的是默認(rèn)的服務(wù)端口:61616和默認(rèn)的console端口:8161

?????????如果是在linuxunix下運行,在bin目錄下執(zhí)行命令:./activemq setup

3、修改ActiveMQ的服務(wù)端口和console端口

???????? A、修改服務(wù)端口:打開conf/activemq.xml文件,修改以下紅色字體部分

????????<transportConnectors>

???????????<transportConnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/>

???????</transportConnectors>

?

B、修改console的地址和端口:打開conf/jetty.xml文件,修改以下紅色字體部分

????<bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">

???????<property name="port" value="8162"/>

?</bean>

4、通過客戶端代碼試用ActiveMQ

??????? 需要提前將activemq解壓包中的lib目錄下的相關(guān)包引入到工程中,再進(jìn)行如下編碼:

1、發(fā)送端的代碼:

importjavax.jms.Connection;

importjavax.jms.ConnectionFactory;

importjavax.jms.DeliveryMode;

importjavax.jms.Destination;

importjavax.jms.MessageProducer;

importjavax.jms.Session;

importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnection;

importorg.apache.activemq.ActiveMQConnectionFactory;

publicclass?Sender {

????privatestaticfinalintSEND_NUMBER?= 5;

?

????publicstaticvoid?main(String[] args) {

????????// ConnectionFactory:連接工廠,JMS用它創(chuàng)建連接

????????ConnectionFactory connectionFactory;

????????// ConnectionJMS客戶端到JMS Provider的連接

????????Connection connection =?null;

????????// Session一個發(fā)送或接收消息的線程

????????Session session;

????????// Destination:消息的目的地;消息發(fā)送給誰.

????????Destination destination;

????????// MessageProducer:消息發(fā)送者

????????MessageProducer producer;

????????// TextMessage message;

????????//構(gòu)造ConnectionFactory實例對象,此處采用ActiveMq的實現(xiàn)jar

????????connectionFactory =?new?ActiveMQConnectionFactory(

????????????????ActiveMQConnection.DEFAULT_USER,

????????????????ActiveMQConnection.DEFAULT_PASSWORD,

????????????????"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");

????????try?{

????????????//構(gòu)造從工廠得到連接對象

????????????connection =connectionFactory.createConnection();

????????????//啟動

????????????connection.start();

????????????//獲取操作連接

????????????session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

????????????//獲取session

????????????destination = session.createQueue("FirstQueue");

????????????//得到消息生成者【發(fā)送者】

????????????producer =session.createProducer(destination);

????????????//設(shè)置不持久化,此處學(xué)習(xí),實際根據(jù)項目決定

???????????producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

????????????//構(gòu)造消息,此處寫死,項目就是參數(shù),或者方法獲取

????????????sendMessage(session, producer);

????????????session.commit();

????????}?catch?(Exception e) {

????????????e.printStackTrace();

????????}?finally?{

????????????try?{

????????????????if?(null?!= connection)

????????????????????connection.close();

????????????}?catch?(Throwable ignore) {

????????????}

????????}

????????}

???

????publicstaticvoid?sendMessage(Session session,MessageProducer producer)

????????????throws?Exception {

????????for?(int?i = 1; i <=SEND_NUMBER; i++) {

????????????TextMessage message = session

????????????????????.createTextMessage("ActiveMq發(fā)送的消息"?+ i);

????????????//發(fā)送消息到目的地方

????????????System.out.println("發(fā)送消息:"?+?"ActiveMq?發(fā)送的消息"?+ i);

????????????producer.send(message);

????????}

????}

}

?

2、接收端代碼:

importjavax.jms.Connection;

importjavax.jms.ConnectionFactory;

importjavax.jms.Destination;

importjavax.jms.MessageConsumer;

importjavax.jms.Session;

importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnection;

importorg.apache.activemq.ActiveMQConnectionFactory;

?

publicclass?Receive {

????publicstaticvoid?main(String[] args) {

????????// ConnectionFactory:連接工廠,JMS用它創(chuàng)建連接

????????ConnectionFactory connectionFactory;

????????// ConnectionJMS客戶端到JMS Provider的連接

????????Connection connection =?null;

????????// Session一個發(fā)送或接收消息的線程

????????Session session;

????????// Destination:消息的目的地;消息發(fā)送給誰.

????????Destination destination;

????????//消費者,消息接收者

????????MessageConsumer consumer;

????????connectionFactory =?new?ActiveMQConnectionFactory(

????????????????ActiveMQConnection.DEFAULT_USER,

????????????????ActiveMQConnection.DEFAULT_PASSWORD,

????????????????"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");

????????try?{

????????????//構(gòu)造從工廠得到連接對象

????????????connection =connectionFactory.createConnection();

????????????//啟動

????????????connection.start();

????????????//獲取操作連接

????????????session = connection.createSession(false,

????????????????????Session.AUTO_ACKNOWLEDGE);

????????????//獲取session

????????????destination = session.createQueue("FirstQueue");

????????????consumer =session.createConsumer(destination);

????????????while?(true) {

????????????????//設(shè)置接收者接收消息的時間,為了便于測試,這里誰定為100s

????????????????TextMessage message =(TextMessage) consumer.receive(100000);

????????????????if?(null?!= message) {

????????????????????System.out.println("收到消息"?+ message.getText());

????????????????}?else?{

????????????????????break;

????????????????}

????????????}

????????}?catch?(Exception e) {

????????????e.printStackTrace();

????????}?finally?{

????????????try?{

????????????????if?(null?!= connection)

????????????????????connection.close();

????????????}?catch?(Throwable ignore) {

????????????}

????????}

????}

}

3、通過監(jiān)控查看消息堆棧的記錄:

??????登陸http://localhost:8162/admin/queues.jsp,默認(rèn)的用戶名和密碼:admin/admin

二、ActiveMQ的多種部署方式

?????????單點的ActiveMQ作為企業(yè)應(yīng)用無法滿足高可用和集群的需求,所以ActiveMQ提供了master-slavebroker cluster等多種部署方式,但通過分析多種部署方式之后我認(rèn)為需要將兩種部署方式相結(jié)合才能滿足我們公司分布式和高可用的需求,所以后面就重點將解如何將兩種部署方式相結(jié)合。

1、Master-Slave部署方式
1)shared filesystem Master-Slave部署方式

?????????主要是通過共享存儲目錄來實現(xiàn)masterslave的熱備,所有的ActiveMQ應(yīng)用都在不斷地獲取共享目錄的控制權(quán),哪個應(yīng)用搶到了控制權(quán),它就成為master

?????????多個共享存儲目錄的應(yīng)用,誰先啟動,誰就可以最早取得共享目錄的控制權(quán)成為master,其他的應(yīng)用就只能作為slave

2)shared database Master-Slave方式

?????????shared filesystem方式類似,只是共享的存儲介質(zhì)由文件系統(tǒng)改成了數(shù)據(jù)庫而已。

3)Replicated LevelDB Store方式

?????????這種主備方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協(xié)調(diào)選擇一個node作為master。被選擇的master broker node開啟并接受客戶端連接。

其他node轉(zhuǎn)入slave模式,連接master并同步他們的存儲狀態(tài)。slave不接受客戶端連接。所有的存儲操作都將被復(fù)制到連接至Masterslaves

如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網(wǎng)絡(luò)中并連接master進(jìn)入slave mode。所有需要同步的disk的消息操作都將等待存儲狀態(tài)被復(fù)制到其他法定節(jié)點的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master將會存儲并更新然后等待?(2-1)=1slave存儲和更新完成,才匯報success。至于為什么是2-1,熟悉Zookeeper的應(yīng)該知道,有一個node要作為觀擦者存在。

單一個新的master被選中,你需要至少保障一個法定node在線以能夠找到擁有最新狀態(tài)的node。這個node將會成為新的master。因此,推薦運行至少3replica nodes,以防止一個node失敗了,服務(wù)中斷。

2、Broker-Cluster部署方式

?????????前面的Master-Slave的方式雖然能解決多服務(wù)熱備的高可用問題,但無法解決負(fù)載均衡和分布式的問題。Broker-Cluster的部署方式就可以解決負(fù)載均衡的問題。

???????? Broker-Cluster部署方式中,各個broker通過網(wǎng)絡(luò)互相連接,并共享queue。當(dāng)broker-A上面指定的queue-A中接收到一個message處于pending狀態(tài),而此時沒有consumer連接broker-A時。如果cluster中的broker-B上面由一個consumer在消費queue-A的消息,那么broker-B會先通過內(nèi)部網(wǎng)絡(luò)獲取到broker-A上面的message,并通知自己的consumer來消費。

1)static Broker-Cluster部署

?????????activemq.xml文件中靜態(tài)指定Broker需要建立橋連接的其他Broker

1、??首先在Broker-A節(jié)點中添加networkConnector節(jié)點:

<networkConnectors>?

??????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>

</networkConnectors>

2、??修改Broker-A節(jié)點中的服務(wù)提供端口為61616

<transportConnectors>

?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3、??Broker-B節(jié)點中添加networkConnector節(jié)點:

<networkConnectors>?

? ????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4、??修改Broker-A節(jié)點中的服務(wù)提供端口為61617

<transportConnectors>

?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5、分別啟動Broker-ABroker-B

2)Dynamic Broker-Cluster部署

?????????activemq.xml文件中不直接指定Broker需要建立橋連接的其他Broker,由activemq在啟動后動態(tài)查找:

1、??首先在Broker-A節(jié)點中添加networkConnector節(jié)點:

<networkConnectors>?

??????????????? <networkConnectoruri="multicast://default"

???????????dynamicOnly="true"

???????????networkTTL="3"

???????????prefetchSize="1"

???????????decreaseNetworkConsumerPriority="true" />

</networkConnectors>

2、修改Broker-A節(jié)點中的服務(wù)提供端口為61616

<transportConnectors>

?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>

</transportConnectors>

3、在Broker-B節(jié)點中添加networkConnector節(jié)點:

<networkConnectors>?

??????????????? <networkConnectoruri="multicast://default"

???????????dynamicOnly="true"

???????????networkTTL="3"

???????????prefetchSize="1"

???????????decreaseNetworkConsumerPriority="true" />

</networkConnectors>

4、修改Broker-B節(jié)點中的服務(wù)提供端口為61617

<transportConnectors>

?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>

</transportConnectors>

5、啟動Broker-ABroker-B

2、Master-Slave與Broker-Cluster相結(jié)合的部署方式

?????????可以看到Master-Slave的部署方式雖然解決了高可用的問題,但不支持負(fù)載均衡,Broker-Cluster解決了負(fù)載均衡,但當(dāng)其中一個Broker突然宕掉的話,那么存在于該Broker上處于Pending狀態(tài)的message將會丟失,無法達(dá)到高可用的目的。

?????????由于目前ActiveMQ官網(wǎng)上并沒有一個明確的將兩種部署方式相結(jié)合的部署方案,所以我嘗試者把兩者結(jié)合起來部署:

?????????

1、部署的配置修改

?????????這里以Broker-A + Broker-B建立clusterBroker-C作為Broker-Bslave為例:

1)首先在Broker-A節(jié)點中添加networkConnector節(jié)點:

<networkConnectors>?

??????????????? <networkConnector?? uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>

</networkConnectors>

2)修改Broker-A節(jié)點中的服務(wù)提供端口為61616

<transportConnectors>

?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3)Broker-B節(jié)點中添加networkConnector節(jié)點:

<networkConnectors>?

??????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4)修改Broker-B節(jié)點中的服務(wù)提供端口為61617

<transportConnectors>

?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5)修改Broker-B節(jié)點中的持久化方式:

??????<persistenceAdapter>

???????????<kahaDB directory="/localhost/kahadb"/>

????? ??</persistenceAdapter>

6)Broker-C節(jié)點中添加networkConnector節(jié)點:

<networkConnectors>?

??????????????? <networkConnector?? uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

7)修改Broker-C節(jié)點中的服務(wù)提供端口為61618

<transportConnectors>

?? ??????<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

8)修改Broker-B節(jié)點中的持久化方式:

??????<persistenceAdapter>

???????????<kahaDB directory="/localhost/kahadb"/>

???????</persistenceAdapter>

9)分別啟動broker-A、broker-B、broker-C,因為是broker-B先啟動,所以“/localhost/kahadb”目錄被lock住,broker-C將一直處于掛起狀態(tài),當(dāng)人為停掉broker-B之后,broker-C將獲取目錄“/localhost/kahadb”的控制權(quán),重新與broker-A組成cluster提供服務(wù)。

總結(jié)

以上是生活随笔為你收集整理的activemq部署安装的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。