日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Activemq-In-action(三)

發(fā)布時(shí)間:2024/4/13 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Activemq-In-action(三) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 第三部分 Activemq應(yīng)用
    • 創(chuàng)建Java應(yīng)用
      • 集成broker
        • 嵌入broker
          • Broker Service
          • Broker Factory
        • 使用spring集成
          • 集成broker
          • 集成clients
            • 定義connection factory
            • 定義destination
            • 定義consumer
            • 定義producer
    • 嵌入Activemq到其他Java容器
    • 其他語(yǔ)法訪問(wèn)Activemq
  • 第四部分 Activemq高級(jí)特性
    • Broker拓?fù)浣Y(jié)構(gòu)
      • 高可用
        • Shared Nothing Master/Slave
          • 配置
          • 場(chǎng)景
        • Shared Database Master/Slave
        • Shared File system Master/Slave
      • Broker之間的網(wǎng)絡(luò)
        • Store and Forward(存儲(chǔ)轉(zhuǎn)發(fā))
        • Network發(fā)現(xiàn)機(jī)制
        • Network配置
      • 應(yīng)用擴(kuò)展
        • 垂直擴(kuò)展
        • 水平擴(kuò)展
        • 流量分區(qū)
    • Activemq Broker高級(jí)特性
      • Wildcards and Composite Destinations
        • 訂閱Wildcard Destinations
        • 發(fā)送消息到多個(gè)destination
      • advisory message
      • Virtual Topics
      • Retroactive Consumers(可回溯消費(fèi)者)
      • 消息重投遞和死信隊(duì)列
        • 重新投遞策略
        • 死信隊(duì)列
    • 高級(jí)客戶端選項(xiàng)
      • Exclusive Consumer
      • Message Groups
      • ActiveMQ Streams
      • Blob Messages
    • 性能調(diào)優(yōu)
      • 通用技術(shù)
        • 持久化和非持久化消息
        • 事務(wù)
        • 內(nèi)嵌brokers
        • Tuning the OpenWire protocol
          • TCP Transport優(yōu)化
      • 優(yōu)化Producer
        • 異步發(fā)送
        • Producer Flow Control
      • 優(yōu)化Consumer
        • 預(yù)獲取限制(Prefetch limit)
        • 消息的投遞和確認(rèn)
        • 異步分發(fā)
      • 實(shí)踐
    • 管理和監(jiān)控ActiveMQ
      • APIs
        • JMX
          • 暴露MBean
          • 使用JMX APIs
          • 高級(jí) JMX 配置
        • 通知消息(Advisory Messages)
      • Tools
        • 命令行工具
        • Command Agent
        • JConsole
        • Web Console
      • Logging
  • 參考

第三部分 Activemq應(yīng)用

創(chuàng)建Java應(yīng)用

集成broker

嵌入broker

Broker Service

可以通過(guò)類org.apache.activemq.broker.BrokerService直接啟動(dòng)一個(gè)broker,并通過(guò)它設(shè)置配置和控制周期。

public static void main(String[] args) throws Exception {BrokerService broker = new BrokerService(); //Abroker.setBrokerName("localhost");//Abroker.setDataDirectory("data/"); //ASimpleAuthenticationPlugin authentication =new SimpleAuthenticationPlugin();List<AuthenticationUser> users =new ArrayList<AuthenticationUser>();users.add(new AuthenticationUser("admin","password","admins,publishers,consumers"));users.add(new AuthenticationUser("publisher","password","publishers,consumers"));users.add(new AuthenticationUser("consumer","password","consumers"));users.add(new AuthenticationUser("guest","password","guests"));authentication.setUsers(users);broker.setPlugins(new BrokerPlugin[]{authentication}); //Bbroker.addConnector("tcp://localhost:61616"); //Cbroker.start(); //D } //A Instantiate and configure Broker Service //B Add plugins //C Add connectors //D Start broker
Broker Factory

BrokerService適用于簡(jiǎn)單場(chǎng)景,如果想要使用自定義配置,則org.apache.activemq.broker.BrokerFactory是個(gè)合適的選擇,可以通過(guò)xml配置文件啟動(dòng)一個(gè)broker。

public class Factory {public static void main(String[] args) throws Exception {System.setProperty("activemq.base", System.getProperty("user.dir"));BrokerService broker = BrokerFactory.createBroker(new URI( #A"xbean:src/main/resources/org/apache/activemq/book/ch5/activemq-simple.xml" #A)); //Abroker.start();} } //A Creating broker from XML //URI: //file:/etc/activemq/activemq.xml //broker:(tcp://localhost:61616,network:static:tcp://remotehost:61616)?persistent=false&useJmx

使用spring集成

集成broker
package org.apache.activemq.book.ch6.spring; import org.apache.activemq.book.ch5.Publisher; import org.apache.xbean.spring.context.FileSystemXmlApplicationContext; public class SpringBroker {public static void main(String[] args) throws Exception {if (args.length == 0) {System.err.println("Please define a configuration file!");return;}String config = args[0]; //ASystem.out.println("Starting broker with the following configuration: " + config);System.setProperty("activemq.base", System.getProperty("user.dir")); //BFileSystemXmlApplicationContext //Ccontext = new FileSystemXmlApplicationContext(config); //CPublisher publisher = new Publisher(); //Dfor (int i = 0; i < 100; i++) { //Dpublisher.sendMessage(new String[]{"JAVA", "IONA"}); //D} //D} } //A Define configuration file //B Set base property //C Initialize application context //D Send messages
集成clients

Spring集成client工作:

  • 定義connection factory:
  • 定義destination:
  • 定義consumer:
  • 定義producer:
定義connection factory

使用:org.apache.activemq.ActiveMQConnectionFactory,org.apache.activemq.pool.PooledConnectionFactory

定義destination

使用:org.apache.activemq.command.ActiveMQTopic,org.apache.activemq.command.ActiveMQQueue

定義consumer

DefaultMessageListenerContainer

定義producer

org.springframework.jms.core.JmsTemplate

嵌入Activemq到其他Java容器

Activemq不僅能嵌入Java應(yīng)用,還可以嵌入:

  • 舊的Java應(yīng)用
  • Apache Tomcat
  • Apache Geronimo
  • JBoss
  • Jetty

其他語(yǔ)法訪問(wèn)Activemq

Stomp (Streaming Text Orientated Messaging Protocol),用于腳本語(yǔ)言。

第四部分 Activemq高級(jí)特性

Broker拓?fù)浣Y(jié)構(gòu)

高可用

3種方式配置master/slave 。

Shared Nothing Master/Slave

Master 和 Slave各自都單獨(dú)存儲(chǔ)持久化的消息,它們不共享數(shù)據(jù)。

所有的state需要同步到slave。Master收到持久化消息時(shí),需要先**同步(sync)**給Slave之后,才向Producer發(fā)送ACK確認(rèn)。

slave不啟動(dòng)任何transport和network,因此不能接收連接。只有Master負(fù)責(zé)Client的請(qǐng)求,Slave不接收Client請(qǐng)求。Slave連接到Master,負(fù)責(zé)備份消息。

Master出現(xiàn)故障,Slave有兩種處理方式:1、自己成為Master;2、關(guān)閉(停服務(wù))—根據(jù)具體配置而定。

客戶端配置failover:

failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false

Slave 只能同步它連接到Master之后的消息。在Slave連接到Master之前Producer向Master發(fā)送的消息將不會(huì)同步給Slave,這可以通過(guò)配置(waitForSlave)參數(shù),只有當(dāng)Slave也啟動(dòng)之后,Master才開(kāi)始初始化TransportConnector接受Client的請(qǐng)求(Producer的請(qǐng)求),否則有可能丟失消息。

如果Master 或者 Slave其中之一宕機(jī),它們之間不同步的消息 無(wú)法 自動(dòng)進(jìn)行同步,此時(shí)只能手動(dòng)恢復(fù)不同步的消息了。也就是說(shuō):“ActiveMQ沒(méi)有提供任何有效的手段,能夠讓master與slave在故障恢復(fù)期間,自動(dòng)進(jìn)行數(shù)據(jù)同步”

對(duì)于非持久化消息,并不會(huì)同步給Slave。因此,Master宕機(jī),非持久化消息會(huì)丟失。

已經(jīng)運(yùn)行的broker,如果做master/slave,需要把master的數(shù)據(jù)文件copy到slave上,然后重啟maser和slave。

Master 與 Slave之間可能會(huì)出現(xiàn)“Split Brain”現(xiàn)象。比如:Master本身是正常的,但是Master與Slave之間的網(wǎng)絡(luò)出現(xiàn)故障,網(wǎng)絡(luò)故障導(dǎo)致Slave認(rèn)為Master已經(jīng)宕機(jī),因?yàn)樗约簳?huì)成為Master(根據(jù)配置:shutdownOnMasterFailure)。此時(shí),對(duì)Client而言,就會(huì)存在兩個(gè)Master。

配置

配置一個(gè)broker為slave。

<services><masterConnector remoteURI= "tcp://localhost:62001" userName="Rob" password="Davies"/> </services> 屬性默認(rèn)值描述
shutdownOnmasterFailurefalsemaster 宕機(jī)之后slave是否shutdown
屬性默認(rèn)值描述
waitForslavefalsemaster是否等slave啟動(dòng)之后才能接收連接
shutdownOnslaveFailurefalseslave 宕機(jī)之后master是否shutdown
場(chǎng)景

share database和share file 不能滿足的情況。

broker已運(yùn)行

Shared Database Master/Slave

這是很常用的一種架構(gòu)。“共享存儲(chǔ)”,意味著Master與Slave之間的數(shù)據(jù)是共享的。

那如何避免沖突呢?通過(guò)爭(zhēng)奪數(shù)據(jù)庫(kù)表的排他鎖,只有Master有鎖,未獲得鎖的自動(dòng)成為Slave。

對(duì)于“共享存儲(chǔ)”而言,只會(huì)“共享”持久化消息。對(duì)于非持久化消息,它們是在內(nèi)存中保存的。可以通過(guò)配置(forcePersistencyModeBrokerPlugin persistenceFlag)屬性強(qiáng)制所有的消息都持久化。

當(dāng)Master宕機(jī)后,Slave可自動(dòng)接管服務(wù)成為Master。由于數(shù)據(jù)是共享的,因此Master和Slave之間不需要進(jìn)行數(shù)據(jù)的復(fù)制與同步。Slave之間通過(guò)競(jìng)爭(zhēng)鎖來(lái)決定誰(shuí)是Master。

Shared File system Master/Slave

類似 data share。

可能是最好的解決方案。

Broker之間的網(wǎng)絡(luò)

Activemq網(wǎng)絡(luò)使用store和forward。

Store and Forward(存儲(chǔ)轉(zhuǎn)發(fā))

ActiveMQ的存儲(chǔ)和轉(zhuǎn)發(fā)概念意味著,消息在通過(guò)network轉(zhuǎn)發(fā)到其他broker之前,總是被存儲(chǔ)在本地broker中,也就是說(shuō),如果一條消息由于連接原因沒(méi)有被交付,比如說(shuō),正在重連,broker將能夠通過(guò)網(wǎng)絡(luò)連接將未交付的消息發(fā)送到遠(yuǎn)程broker。默認(rèn)情況下,network僅以單向方式操作

當(dāng)然,這并不是說(shuō)network只能單向操作,如果想要雙向操作,同樣可以在遠(yuǎn)程broker中配置一個(gè)network connector指向本地的broker,或者直接指定創(chuàng)建的network connector為雙向duplex。

當(dāng)本地broker和遠(yuǎn)程broker之間建立好一條network后,遠(yuǎn)程broker會(huì)將其所有持久和處于活動(dòng)的消費(fèi)者的目的地信息傳遞給本地broker,本地broker使用這些信息去判斷遠(yuǎn)程broker對(duì)哪種消息感興趣,并轉(zhuǎn)發(fā)該類型消息給它。

假如我們有多個(gè)超市需要連接到一個(gè)后臺(tái)辦公訂購(gòu)系統(tǒng),這將很難靈活擴(kuò)展新的超市,后臺(tái)辦公訂購(gòu)系統(tǒng)不好掌控所有新加入的超市即遠(yuǎn)程broker。注意到這里,超市broker和back office之間的network是雙向的,超市broker的配置:

<networkConnectors><networkConnector uri="static://(tcp://backoffice:61617)" name="bridge" duplex="true"conduitSubscriptions="true"decreaseNetworkConsumerPriority="false"></networkConnector> </networkConnectors>

這里關(guān)于配置,主要注意一點(diǎn)是,配置的順序是很重要的,關(guān)于networks,persistence,transports的順序如下:

  • Networks——必須在消息存儲(chǔ)之前創(chuàng)建
  • Message store——必須在傳輸配置好之前配置完
  • Transports——必須在broker配置的最后
  • <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://activemq.apache.org/schema/core"><broker brokerName="receiver" persistent="true" useJmx="true"><networkConnectors><networkConnector uri="static:(tcp://backoffice:61617)"/></networkConnectors><persistenceAdapter><kahaDB directory = "activemq-data"/> </persistenceAdapter><transportConnectors><transportConnector uri="tcp://localhost:62002"/></transportConnectors></broker> </beans>

    在大型開(kāi)發(fā)場(chǎng)景下的高可用性和network配置結(jié)合

    Network發(fā)現(xiàn)機(jī)制

    ActiveMQ提供兩種發(fā)現(xiàn)機(jī)制:

  • Dynamic——使用組播和會(huì)合方式搜索brokers
  • Static——通過(guò)一個(gè)URI列表配置brokers
  • 使用組播發(fā)現(xiàn)的方式去創(chuàng)建network連接是最簡(jiǎn)單粗暴的,當(dāng)你啟動(dòng)一個(gè)broker時(shí),它會(huì)通過(guò)組播IP去搜索其他的broker并創(chuàng)建network連接。配置方式如下:

    <networkConnectors><networkConnector uri="multicast://default"/> </networkConnectors>

    更多見(jiàn)前面章節(jié)。

    Network配置

    對(duì)于遠(yuǎn)程broker現(xiàn)存在的目的地,可能沒(méi)有任何活動(dòng)持久的訂閱者或消費(fèi)者,因此,當(dāng)network初始化連接到遠(yuǎn)程broker時(shí),遠(yuǎn)程broker會(huì)讀取它現(xiàn)存目的地的消息,并傳遞給本地broker,然后,本地broker也可以轉(zhuǎn)發(fā)那些目的地的消息。
    重要的是要注意,一個(gè)network將使用broker的名稱來(lái)代表遠(yuǎn)程broker創(chuàng)建唯一的持久預(yù)訂代理。 因此,如果在稍后的時(shí)間點(diǎn)更改broker的名稱,很可能會(huì)通過(guò)network丟失持久主題訂閱者的消息。 為避免這種情況,請(qǐng)確保為元素上的brokerName屬性使用唯一的名稱。 有關(guān)簡(jiǎn)要示例,請(qǐng)參閱以下內(nèi)容:

    <broker xmlns="http://activemq.apache.org/schema/core/"brokerName="brokerA"dataDirectory="${activemq.base}/data"> ...<networkConnectors><networkConnector name="brokerA to brokerB" uri="tcp://remotehost:61616"/></networkConnectors> </broker>

    應(yīng)用擴(kuò)展

    垂直擴(kuò)展

    垂直擴(kuò)展是一種用于增加單個(gè)ActiveMQ broker可以處理的連接數(shù)(因此增加負(fù)載)的技術(shù)。默認(rèn)情況下,ActiveMQ broker設(shè)計(jì)為盡可能高效地移動(dòng)消息,以確保低延遲和良好的性能。但是我們可以做一些配置調(diào)整,以確保ActiveMQ broker可以處理大量的并發(fā)連接和大量的隊(duì)列。

    默認(rèn)情況下,ActiveMQ將使用阻塞I/O來(lái)處理傳輸連接。 這導(dǎo)致每個(gè)連接使用一個(gè)線程。 我們可以在ActiveMQ broker上使用非阻塞I/O(而客戶端上仍然使用默認(rèn)傳輸)來(lái)減少使用的線程數(shù)。broker的非阻塞I/O配置如下:

    <broker><transportConnectors><transportConnector name="nio" uri="nio://localhost:61616"/></transportConnectors> </broker>

    除了每個(gè)連接使用一個(gè)線程來(lái)阻塞I/O外,ActiveMQ broker可以使用線程為每個(gè)客戶端連接分派消息。可以通過(guò)將名為org.apache.activemq.UseDedicatedTaskRunner的系統(tǒng)屬性設(shè)置為false,讓ActiveMQ使用線程池。

    ACTIVEMQ_OPTS="-Dorg.apache.activemq.UseDedicatedTaskRunner=false"

    確保ActiveMQ broker具有足夠的內(nèi)存來(lái)處理大量并發(fā)連接有兩步過(guò)程:

    • 首先,需要確保啟動(dòng)ActiveMQ broker的JVM配置了足夠的內(nèi)存。
    ACTIVEMQ_OPTS="-Xmx1024M \-Dorg.apache.activemq.UseDedicatedTaskRunner=false"
    • 第二,確保專門為ActiveMQ broker在JVM配置適當(dāng)?shù)膬?nèi)存量。此調(diào)整通過(guò)< system-Usage >元素的limit屬性進(jìn)行。(最好從512MB開(kāi)始,如果測(cè)試不夠再往上加),配置示例:
    <systemUsage><systemUsage><memoryUsage><memoryUsage limit="512 mb"/></memoryUsage><storeUsage><storeUsage limit="10 gb" name="foo"/></storeUsage><tempUsage><tempUsage limit="1 gb"/></tempUsage></systemUsage> </systemUsage>

    還應(yīng)該降低每一個(gè)連接的CPU負(fù)載,如果使用的OpenWire連接方式,禁用緊密編碼,否則會(huì)使得CPU過(guò)度緊張。

    String uri = "failover://(tcp://localhost:61616?" + " wireFormat.tightEncodingEnabled=false)"; ConnectionFactory cf = new ActiveMQConnectionFactory(uri);

    前面研究的是broker怎么調(diào)整去處理數(shù)千個(gè)連接,下面開(kāi)始研究的是怎么調(diào)整broker去處理數(shù)千個(gè)隊(duì)列。

    默認(rèn)隊(duì)列配置使用單獨(dú)的線程來(lái)將消息從消息存儲(chǔ)區(qū)分頁(yè)到隊(duì)列中,以便分發(fā)給感興趣的消息消費(fèi)者。 對(duì)于大量隊(duì)列,建議通過(guò)為所有隊(duì)列啟用optimize-Dispatch屬性來(lái)禁用此功能,

    <destinationPolicy><policyMap><policyEntries><policyEntry queue=">" optimizedDispatch="true"/></policyEntries></policyMap> </destinationPolicy>

    為了確保不僅可以擴(kuò)展到數(shù)千個(gè)連接,而且還可以擴(kuò)展到數(shù)萬(wàn)個(gè)隊(duì)列,使用JDBC消息存儲(chǔ)庫(kù)或更新和更快的KahaDB消息存儲(chǔ)庫(kù)。 KahaDB默認(rèn)情況下在ActiveMQ中啟用。

    到目前為止,我們已經(jīng)考慮了擴(kuò)展連接,減少線程使用,并選擇正確的消息存儲(chǔ)。 調(diào)整用于擴(kuò)展的ActiveMQ的示例配置如以下:

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="amq-broker" dataDirectory="${activemq.base}/data"><persistenceAdapter><kahaDB directory="${activemq.base}/data" journalMaxFileLength="32mb"/></persistenceAdapter><destinationPolicy><policyMap><policyEntries><policyEntry queue="&gt;" optimizedDispatch="true"/></policyEntries></policyMap></destinationPolicy><systemUsage><systemUsage><memoryUsage><memoryUsage limit="512 mb"/></memoryUsage><storeUsage><storeUsage limit="10 gb" name="foo"/></storeUsage><tempUsage><tempUsage limit="1 gb"/></tempUsage></systemUsage></systemUsage><transportConnectors><transportConnector name="openwire" uri="nio://localhost:61616"/></transportConnectors> </broker>

    水平擴(kuò)展

    除了擴(kuò)展單個(gè)broker之外,還可以使用networks來(lái)增加可用于應(yīng)用程序的ActiveMQ broker的數(shù)量。 由于networks會(huì)自動(dòng)將消息傳遞給具有感興趣的消費(fèi)者的連接broker,因此可以將客戶端配置為連接到一個(gè)broker集群,隨機(jī)選擇一個(gè)來(lái)連接。

    failover://(tcp://broker1:61616,tcp://broker2:61616)?randomize=true

    為了確保隊(duì)列或持久主題訂閱者的消息不會(huì)在broker上孤立,需要將network配置為使用dynamicOnly和低網(wǎng)絡(luò)prefetchSize。

    <networkConnector uri="static://(tcp://remotehost:61617)"name="bridge"dynamicOnly="true"prefetchSize="1"> </networkConnector>

    使用network進(jìn)行水平擴(kuò)展會(huì)帶來(lái)更多的延遲,因?yàn)闈撛诘南⒈仨氃诜职l(fā)給消費(fèi)者之前通過(guò)多個(gè)broker。

    另一種替代部署提供了巨大的可擴(kuò)展性和性能,但需要更多的應(yīng)用規(guī)劃。 這種混合解決方案稱為流量分區(qū)。

    流量分區(qū)

    客戶端流量分割是垂直和水平分割的混合。 通常不使用network,因?yàn)榭蛻舳藨?yīng)用程序決定什么流量應(yīng)該到哪個(gè)broker上。 客戶端應(yīng)用程序必須維護(hù)多個(gè)JMS連接,并決定哪些JMS連接應(yīng)用于哪些目標(biāo)。

    不直接使用network connection的優(yōu)點(diǎn)是,減少在brokers之間轉(zhuǎn)發(fā)消息的開(kāi)銷。 需要平衡這與導(dǎo)致典型應(yīng)用程序的額外復(fù)雜性。

    Activemq Broker高級(jí)特性

    Wildcards and Composite Destinations

    訂閱Wildcard Destinations

    Activemq支持 層級(jí)destination,每個(gè)層級(jí)名字部分用 點(diǎn)號(hào)(.)分隔,適用于topic和queue。

    destination名稱保留3個(gè)特殊字符:

    • .:名稱分隔符
    • *:匹配一個(gè)元素
    • >:匹配一個(gè)或多個(gè)后面的元素

    僅支持消費(fèi)者端。

    發(fā)送消息到多個(gè)destination

    Activemq支持 composite destination 特性,用于把消息發(fā)送到組合的destination(queue、topic)中。

    在使用時(shí),指定destination的name為逗號(hào)分隔的name列表:

    store.order.backoffice,store.order.warehouse #發(fā)送到2個(gè)queue。

    composite destination支持topic和queue混合模式。需要對(duì)destination加上協(xié)議

    queue://**** #queue默認(rèn)可以不用加 topic://**** #混合模式 store.orders, topic://store.orders

    advisory message

    advisory message是系統(tǒng)消息,是broker產(chǎn)生的用于通知系統(tǒng)修改的消息。包括

    • 管理對(duì)象(Connection,Destination,Consumer,Producer)加入或離開(kāi)broker
    • broker達(dá)到限制(limit)

    advisory message產(chǎn)生在系統(tǒng)定義的topic上,每個(gè)advisory message都有一個(gè)JMSType:Advisory,以及預(yù)定義的字符串屬性:

    • originBrokerId - 產(chǎn)生Advisory消息的broker的id。
    • originBrokerName -broker的名字
    • originBrokerURL - broker的url

    org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY

    默認(rèn)一些advisories 是禁用的,

    <destinationPolicy><policyMap><policyEntries><policyEntry queue=">" advisoryForSlowConsumers="true" /></policyEntries></policyMap> </destinationPolicy>

    Virtual Topics

    如果想廣播消息到多個(gè)consumer,可以使用topic。如果想一組consumer消息一個(gè)destination,則使用queue。但是沒(méi)有合適的方式廣播消息到topic,但是一組consumer 跟queue一樣共同消費(fèi)它(每條消息僅會(huì)投遞到一個(gè)consumer)。

    Virtual Topics 提供了一種簡(jiǎn)便的方式讓topic具有queue相同的語(yǔ)義。

    使用Virtual topic需要遵循:

    • topic必須以 VirtualTopic. 開(kāi)頭。例如:VirtualTopic.orders

    • queue的名字格式為:Consumer.<consumer name>.VirtualTopic.<VirtualTopic Name>

      Consumer.A.VirtualTopic.orders,Consumer.B.VirtualTopic.orders Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders"); MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue); Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders"); MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);//setup the sender Connection senderConnection = connectionFactory.createConnection(); senderConnection.start();Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders"); MessageProducer producer = senerSession.createProducer(ordersDestination);

    Retroactive Consumers(可回溯消費(fèi)者)

    為了性能,采用消息非持久化時(shí),會(huì)帶來(lái)消息不能被消費(fèi)者消費(fèi)。為了在消費(fèi)非持久化的模式下提供有限的回溯能力,Activemq提供了緩存一定size或number的消息的能力。為了實(shí)現(xiàn)此能力,需要:

    • 消費(fèi)者需要告訴broker關(guān)注可回溯消息
    • broker配置destination 需要緩存多少消息。
    String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = connectionFactory.createConnection();c onnection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true"); //設(shè)置需要消費(fèi)回溯消息 MessageConsumer consumer = session.createConsumer(topic); Message result = consumer.receive();

    broker端需要設(shè)置RecoverPolicy。

    <destinationPolicy><policyMap><policyEntries><policyEntry topic=">" ><subscriptionRecoveryPolicy ><fixedSizedSubscriptionRecoveryPolicy maximumSize = "8mb"/></subscriptionRecoveryPolicy></policyEntry></policyEntries></policyMap> </destinationPolicy>

    消息重投遞和死信隊(duì)列

    當(dāng)消息不能重投遞或者消息過(guò)期,會(huì)被移到死信隊(duì)列中,由管理員消費(fèi)。

    消息重投遞的常用場(chǎng)景:

    • 事務(wù)回滾
    • 事務(wù)提交前close
    • A client is using CLIENT_ACKNOWLEDGE on a Session and calls recover() on that Session.

    重新投遞策略

    RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); policy.setMaximumRedeliveries(2);

    死信隊(duì)列

    重新投遞 次數(shù)超過(guò) MaximumRedeliveries ,則會(huì)進(jìn)入死信隊(duì)列。

    默認(rèn)情況,有一個(gè)死信隊(duì)列:AcitveMQ.DLQ,所有的消息都投遞到此隊(duì)列,包括過(guò)期消息,重投遞失敗消息。

    配置個(gè)性化死信隊(duì)列。

    <destinationPolicy><policyMap><policyEntries><policyEntry queue=">"><deadLetterStrategy><individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="true"processExpired="false"processNonPersistent="false"/></deadLetterStrategy></policyEntry></policyEntries></policyMap> </destinationPolicy>

    高級(jí)客戶端選項(xiàng)

    Exclusive Consumer

    當(dāng)多個(gè)消費(fèi)者消費(fèi)一個(gè)queue時(shí),不能保證消費(fèi)的順序。如果設(shè)置consumer為exclusive,則queue會(huì)選擇一組consumer中的一個(gè),把所有消息都發(fā)送給它,相對(duì)于僅有一個(gè)消費(fèi)者的好處是,選中的consumer 不可用時(shí),會(huì)再選中另一個(gè)繼續(xù)消費(fèi)。

    當(dāng)消費(fèi)一個(gè)queue的consumer中既有exclusive的,又有非exclusive的,仍然會(huì)僅把消息投遞給exclusive consumer。當(dāng)所有非exclusive consumer都不可用,queue就會(huì)回到normal模式,queue會(huì)把消息投遞到所有 非exclusive consumer。

    queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);

    常見(jiàn)exclusive producer

    public void start() throws JMSException { this.connection = this.factory.createConnection(); this.connection.start(); this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = this.session.createQueue(this.queueName + "?consumer.exclusive=true") Message message = this.session.createMessage(); MessageProducer producer = this.session.createProducer(destination); producer.send(message); MessageConsumer consumer = this.session.createConsumer(destination); consumer.setMessageListener(this); }

    Message Groups

    所有message都由一個(gè)cousumer消費(fèi),通過(guò)Message header JMSXGroupID,具有相同JMSXGroupID的消息會(huì)被發(fā)送到同一個(gè)consumer,這種通用概念叫消息組。broker保證同一個(gè)消息組的消息都會(huì)發(fā)送到同一個(gè)consumer。當(dāng)consumer不可連接時(shí),消息會(huì)又都路由到另外一個(gè)consumer。

    創(chuàng)建消息組

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("group.queue"); MessageProducer producer = session.createProducer(queue); Message message = session.createTextMessage(" <foo>test</foo> "); message.setStringProperty("JMSXGroupID", "TEST_GROUP_A"); producer.send(message);

    Activemq會(huì)為組內(nèi)的每一條消息都加一個(gè)header屬性值:JMSXGroupSeq

    如果想明確的關(guān)閉一個(gè)消息組,設(shè)置JMSXGroupSeq為-1。

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("group.queue"); MessageProducer producer = session.createProducer(queue); <foo></foo> Message message = session.createTextMessage(" <foo>close</foo> "); message.setStringProperty("JMSXGroupID", "TEST_GROUP_A"); message.setIntProperty("JMSXGroupSeq", -1); producer.send(message);

    不能假設(shè)JMSXGroupSeq都是從1開(kāi)始。當(dāng)一個(gè)消息組consumer不可用時(shí),所有路由到它的同組的消息都會(huì)路由到另一個(gè)consumer,JMSXGroupFirstForConsumer被設(shè)置為路由到consumer的是第一條信息,用于標(biāo)識(shí)consumer正在消費(fèi)另一個(gè)消息組或者一個(gè)新的消息組。

    Session session = MessageConsumer consumer = session.createConsumer(queue); Message message = consumer.receive(); String groupId = message.getStringProperty("JMSXGroupId"); if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) { // do processing for new group }

    如果有多個(gè)consumer消費(fèi)一個(gè)queue,很有可能把消息都發(fā)送給第一個(gè)consumer,如果為了聚恒分布式消費(fèi),可以設(shè)置broker有足夠的consumer之后再分發(fā)消息。

    等待消費(fèi)者滿足條件:

    <destinationPolicy><policyMap><policyEntries><policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="5000"/></policyEntries></policyMap> </destinationPolicy>

    ActiveMQ Streams

    ActiveMQ Streams是一個(gè)高級(jí)特性(需要謹(jǐn)慎使用),允許通過(guò)一個(gè)文件或socket發(fā)送數(shù)據(jù)到Activemq。

    如果僅涉及一個(gè)consumer或一個(gè)queue(or exclusive consumer),工作很好,但是其他情況,消息的順序就是個(gè)很大的問(wèn)題。

    Activemq使用JMS Stream的好處是把消息分成chunks,可以傳輸大量數(shù)據(jù)。

    通過(guò)ActiveMQ Stream發(fā)送數(shù)據(jù):

    //source of our large data FileInputStream in = new FileInputStream("largetextfile.txt"); String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); //創(chuàng)建stream OutputStream out = connection.createOutputStream(destination); //now write the file on to ActiveMQ byte[] buffer = new byte[1024]; while(true){int bytesRead = in.read(buffer);if (bytesRead==-1){break;}out.write(buffer,0,bytesRead); } //close the stream so the receiving side knows the steam is finished out.close();

    ActiveMQ Stream接收數(shù)據(jù):

    //destination of our large data FileOutputStream out = new FileOutputStream("copied.txt"); String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //we want be be an exclusive consumer String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true"; Queue destination = session.createQueue(exclusiveQueueName); InputStream in = connection.createInputStream(destination); //now write the file from ActiveMQ byte[] buffer = new byte[1024]; while(true){int bytesRead = in.read(buffer);if (bytesRead==-1){break;}out.write(buffer,0,bytesRead); } out.close();

    使用exclusive consumer時(shí),確保同一個(gè)時(shí)刻僅有一個(gè)consumer讀取stream。Topic也可以使用stream,但是讀取開(kāi)始之前的數(shù)據(jù)是丟失的。

    Blob Messages

    Blob Message并不包含要發(fā)送的數(shù)據(jù),僅是一個(gè)通知,告知一個(gè)Blob是可用的。Blob數(shù)據(jù)本身是在外部傳輸,例如ftp或http。Blob消息包含的數(shù)據(jù)的url,已經(jīng)獲取InputStream 得到真實(shí)數(shù)據(jù)的方法。

    Sending a BlobMessage

    String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = connectionFactory.createConnection(); connection.start(); ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(destination); BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com")); producer.send(message);

    處理BlobMessage:

    // destination of our Blob data FileOutputStream out = new FileOutputStream("blob.txt"); String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); BlobMessage blobMessage = (BlobMessage) consumer.receive(); //獲取stream。 InputStream in = blobMessage.getInputStream(); // now write the file from ActiveMQ byte[] buffer = new byte[1024]; while (true) {int bytesRead = in.read(buffer);if (bytesRead == -1) {break;}out.write(buffer, 0, bytesRead); } out.close();

    性能調(diào)優(yōu)

    通用技術(shù)

    非持久化(不關(guān)心消息丟失)

    持久化和非持久化消息

    持久化消息用于減少災(zāi)難性錯(cuò)誤以及保證consumer可消費(fèi)之前的數(shù)據(jù)。

    非持久化消息,僅把消息投遞給active consumer。

    非持久化消息比持久化消息快的原因:

    • 異步發(fā)送(不需要等待結(jié)果)
    • 不存儲(chǔ)

    使用持久化消息用于防范消息丟失,activemq采用:

    • 重發(fā)消息
    • 過(guò)濾掉重復(fù)消息

    設(shè)置投遞模式

    MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    事務(wù)

    事務(wù)用于批量處理數(shù)據(jù)(不同于數(shù)據(jù)庫(kù)的事務(wù),不是原子性保證)

    public void sendTransacted() throws JMSException { //create a default connection - we'll assume a broker is running //with its default configuration ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Connection connection = cf.createConnection(); connection.start(); //create a transacted session Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Topic topic = session.createTopic("Test.Transactions"); MessageProducer producer = session.createProducer(topic); int count =0;for (int i =0; i < 1000; i++) {Message message = session.createTextMessage("message " + i);producer.send(message);//commit every 10 messagesif (i!=0 && i%10==0){session.commit();}} }public void sendNonTransacted() throws JMSException { //create a default connection - we'll assume a broker is running //with its default configuration ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Connection connection = cf.createConnection(); connection.start(); //create a default session (no transactions) Session session = connection.createSession(false, Session.AUTO_ACKNOWELDGE); Topic topic = session.createTopic("Test.Transactions"); MessageProducer producer = session.createProducer(topic); int count =0;for (int i =0; i < 1000; i++) {Message message = session.createTextMessage("message " + i);producer.send(message);} }

    內(nèi)嵌brokers

    消除 序列化和網(wǎng)絡(luò)傳輸,數(shù)據(jù)在同一個(gè)JVM內(nèi)。

    BrokerService broker = new BrokerService(); broker.setBrokerName("service"); broker.setPersistent(false); broker.addConnector("tcp://localhost:61616"); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://service"); cf.setCopyMessageOnSend(false); Connection connection = cf.createConnection(); connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //we will need to respond to multiple destinations - so use null //as the destination this producer is bound to final MessageProducer producer = session.createProducer(null); //create a Consumer to listen for requests to service Queue queue = session.createQueue("service.queue"); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() {public void onMessage(Message msg) {try {TextMessage textMsg = (TextMessage)msg;String payload = "REPLY: " + textMsg.getText();Destination replyTo;replyTo = msg.getJMSReplyTo();textMsg.clearBody();textMsg.setText(payload);producer.send(replyTo, textMsg);} catch (JMSException e) {e.printStackTrace();}} });

    Connecting a QueueRequestor,用于測(cè)試內(nèi)嵌broker

    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); QueueConnection connection = cf.createQueueConnection(); connection.start(); QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("service.queue"); QueueRequestor requestor = new QueueRequestor(session,queue); for(int i =0; i < 10; i++) { TextMessage msg = session.createTextMessage("test msg: " + i); TextMessage result = (TextMessage)requestor.request(msg); System.err.println("Result = " + result.getText()); }

    Tuning the OpenWire protocol

    OpenWire Protocol使用二進(jìn)制格式傳輸命令到broker,命令包含消息、應(yīng)答等信息。性能相關(guān)的格式化參數(shù)如下:

    屬性默認(rèn)值描述
    tcpNoDelayEnabledfalse是否啟用tcpNoDelay。true,則在慢網(wǎng)絡(luò)發(fā)送大量小size消息能夠提高性能
    cacheEnabledtrue如果啟用,則緩存重復(fù)的值(比如producerId和消息目的地),允許傳輸與緩存值對(duì)應(yīng)的簡(jiǎn)短的key.該設(shè)置減小了傳輸數(shù)據(jù)的大小,因而在網(wǎng)絡(luò)性能不佳時(shí)可以提升性能.但是因?yàn)樵诰彺嬷胁檎覕?shù)據(jù)會(huì)同時(shí)在客戶端和代理所在的機(jī)器中引入CPU負(fù)載的額外開(kāi)銷,配置時(shí)請(qǐng)考慮中引入開(kāi)銷的影響.
    cacheSize1024緩存條目的最大數(shù)量.該值不能超過(guò)Short.MAX_VALUE值的二分之一.當(dāng)開(kāi)啟緩存時(shí),該值設(shè)置的越大性能越好.但是因?yàn)?strong>每一個(gè)傳輸連接都獨(dú)立使用一個(gè)緩存,所以需要在代理端考慮因緩存帶來(lái)的額外開(kāi)銷,特別是當(dāng)有大量的客戶端連接到代理時(shí).
    tightEncodingEnabledtrue以CPU敏感的方式壓縮消息.建議在broker使用所有可用CPU時(shí)將該選項(xiàng)設(shè)置為停用.

    可以通過(guò)下面的方式,將上述參數(shù)附加到連接到代理的傳輸連接器的URI中:

    //使用tightEncodingEnabled參數(shù)禁用緊湊編碼(tight encoding) String uri = "failover://(tcp://localhost:61616?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false)"; ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url); cf.setAlwaysSyncSend(true);
    TCP Transport優(yōu)化

    ActiveMQ中使用最廣泛的連接器是TCP Transport.有兩個(gè)直接影響它的性能:

    • socketBufferSize:TCP Transport用于發(fā)送和接收數(shù)據(jù)的緩沖區(qū)大小.通常該參數(shù)設(shè)置的越大越好(盡管這個(gè)最大值收到操作系統(tǒng)限制,但是可以去測(cè)試).默認(rèn)值為65536,單位是byte.
    • tcpNoDelay–默認(rèn)值為false.通常TCP套接字緩存即將被發(fā)送的小尺寸數(shù)據(jù)包.當(dāng)啟用這個(gè)參數(shù)時(shí),消息會(huì)被**盡快發(fā)送(**譯注:不緩沖).同樣可以測(cè)試這個(gè)配置,因?yàn)樾薷倪@個(gè)參數(shù)是否能提升性能還和操作系統(tǒng)有關(guān).
    String url = "failover://(tcp://localhost:61616?tcpNoDelay=true)"; ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url); cf.setAlwaysSyncSend(true);

    優(yōu)化Producer

    在Producer分發(fā)消息到消息消費(fèi)者之前,Producer發(fā)送消息到broker的效率是影響整個(gè)應(yīng)用程序性能的基本因素。

    以下是幾個(gè)影響消息吞吐量和消息發(fā)送延遲 的調(diào)優(yōu)參數(shù).

    異步發(fā)送

    ActiveMQ中非持久化消息分發(fā)是可靠的,因?yàn)橄?huì)在網(wǎng)絡(luò)故障和系統(tǒng)崩潰(只要消息生產(chǎn)者依然是活動(dòng)的–此時(shí)消息生產(chǎn)者將消息緩存在失效轉(zhuǎn)移連接器的緩存中)中幸存下來(lái).但是,通過(guò)設(shè)置消息生產(chǎn)者的連接工廠的useAsyncSend屬性, 仍然 可以在使用持久化消息時(shí)獲得性能提升。

    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setUseAsyncSend(true);

    上面代碼將設(shè)置一個(gè)屬性,告知消息生產(chǎn)者不要嘗試從broker獲取一個(gè)剛剛發(fā)送的消息的回執(zhí).

    如果程序要求消息確保被投遞,則推薦使用系統(tǒng)默認(rèn)的持久化消息分發(fā)模式,并且最好同時(shí)使用事務(wù).

    使用異步方式發(fā)送消息獲以取性能提升的原因不難理解,同時(shí)以這種方式實(shí)現(xiàn)性能提升的方式也很簡(jiǎn)單–只要設(shè)置ActiveMQ連接工廠的一個(gè)屬性即可.下一節(jié)將介紹ActiveMQ中一種通常不被理解的特性:消息生產(chǎn)者流控制.我們將看到大量的關(guān)于消息生產(chǎn)者效率下降或暫停問(wèn)題,并且理解 如何在應(yīng)用程序中使用流控制來(lái)減少這些問(wèn)題的發(fā)生.

    Producer Flow Control

    生產(chǎn)者流控制允許消息broker在系統(tǒng)資源緊張時(shí)降低消息的通過(guò)量.這種情況發(fā)生在消息消費(fèi)者處理 速度慢于生產(chǎn)者時(shí),此時(shí)消息會(huì)緩存在broker的內(nèi)存中等待被發(fā)送。消息生產(chǎn)者在收到broker通知有足夠存儲(chǔ)空間接收更多消息之前都會(huì)處于等待狀態(tài)。生產(chǎn)者流控制對(duì)于阻止broker的內(nèi)存和臨時(shí)存儲(chǔ)空間超過(guò)限制的值來(lái)說(shuō)是必要的,尤其是對(duì)于廣域網(wǎng)來(lái)說(shuō).

    對(duì)于持久化消息來(lái)說(shuō),生產(chǎn)者流控制默認(rèn)時(shí)就是開(kāi)啟的,但是對(duì)于異步消息發(fā)布來(lái)說(shuō)必須明確的指定為開(kāi)啟(針對(duì)持久化消息,或?qū)τ谂渲贸煽偸钱惒桨l(fā)送消息的連接來(lái)說(shuō)).可以通過(guò)設(shè)置連接工廠的producerWindowSize屬性來(lái)開(kāi)啟消息生產(chǎn)者異步發(fā)送消息的流控制.

    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setProducerWindowSize(1024000);

    producerWindowSize屬性用于設(shè)置生產(chǎn)者在收到broker的存儲(chǔ)空間未超過(guò)限制值回執(zhí)之前可以用來(lái)緩存消息的字節(jié)大小,超過(guò)這個(gè)設(shè)置的值,生產(chǎn)者將等待代理回執(zhí)(而不再生產(chǎn)和發(fā)送消息)。對(duì)于一個(gè)異步發(fā)送消息的生產(chǎn)者來(lái)說(shuō),如果這個(gè)屬性未開(kāi)啟,則broker仍然會(huì)暫停消息流動(dòng),這種情況下,默認(rèn)會(huì)阻塞消息生產(chǎn)者的Transport.阻塞Transport會(huì)阻塞所有使用該Transport連接的用戶,如果消息消費(fèi)者也使用同樣的連接,則會(huì)導(dǎo)致死鎖。生產(chǎn)者流控制運(yùn)行僅阻塞消息生產(chǎn)者而不是整個(gè)消息生產(chǎn)者使用的Transport。

    盡管保護(hù)broker不要運(yùn)行在可用內(nèi)存空間低的狀態(tài)是一個(gè)不錯(cuò)的想法,但是這并不能改善 系統(tǒng)被最慢的消費(fèi)者拖慢時(shí)而產(chǎn)生的性能問(wèn)題。所以,讓我們看看禁用生產(chǎn)者流控制 時(shí)會(huì)發(fā)生什么,如下面代碼中的粗體字所示,你可以在broker的配置中配置消息destination的 策略來(lái)實(shí)現(xiàn).

    <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="10mb" /> </policyEntries> </policyMap> </destinationPolicy>

    默認(rèn)情況下,禁用了生產(chǎn)者流控制之后,發(fā)送給速度慢的消費(fèi)者的消息會(huì)被存放到臨時(shí)存儲(chǔ)空間中,以便消息生產(chǎn)者和其他消息消費(fèi)者可以盡可能快的運(yùn)行。如圖所示,broker可用的最大內(nèi)存限制決定了在什么時(shí)候 消息會(huì)通過(guò)追加到消息游標(biāo)的形式持久化到磁盤上。系統(tǒng)可用最大內(nèi)存限制的作用范圍是整個(gè)broker。這個(gè)限制應(yīng)該要低于消息destination可用內(nèi)存限制,以便在流控制之前起作用。(即,這個(gè)較小的值先起作用,則消息destination使用內(nèi)存不會(huì)超過(guò)配置的限制值,因?yàn)檫@個(gè)值較大)

    盡管有一些因?yàn)榇鎯?chǔ)消息而導(dǎo)致broker的性能損失,在禁用了生產(chǎn)者流控制之后,消息應(yīng)用程序可以獨(dú)立于最慢的消息消費(fèi)者而運(yùn)行。在理想情況下,消息消費(fèi)者總是與消息生產(chǎn)者一樣快速運(yùn)行,這就給我們引入下一節(jié)中關(guān) 于消息消費(fèi)者的優(yōu)化.

    默認(rèn)情況下,當(dāng)啟用了生產(chǎn)者流控制后,當(dāng)broker沒(méi)有空間存放更多消息時(shí),生產(chǎn)者發(fā)送消息的操作會(huì)被阻塞直到broker有足夠空間存儲(chǔ)消息。有兩種方式調(diào)整該參數(shù),使得broker獲取更多存儲(chǔ)消息空間之前,消息 生產(chǎn)者不會(huì)無(wú)限期實(shí)質(zhì)性的掛起。

    第一種調(diào)節(jié)消息生產(chǎn)者流控制的方式稱為sendFailIfNoSpace

    <systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="128 mb"/> </memoryUsage> </systemUsage> </systemUsage>

    sendFailIfNoSpace屬性將控制權(quán)返還給消息生產(chǎn)者,在代理的消息存儲(chǔ)已經(jīng)不足而生產(chǎn)者仍然嘗試發(fā)送操作時(shí),通過(guò)在生產(chǎn)者客戶端拋出異常來(lái)代替永久性的阻塞生產(chǎn)者的發(fā)送操作。這就允許生產(chǎn)者可以捕捉這個(gè)異常,然后等待 一段時(shí)間后,繼續(xù)嘗試發(fā)送操作。

    第二個(gè)調(diào)節(jié)生產(chǎn)者流控制的屬性開(kāi)始于ActiveMQ的5.4.1版本.該屬性名稱為sendFailIfNoSpaceAfterTimeout:

    <systemUsage> <systemUsage sendFailIfNoSpaceAfterTimeout="5000"> <memoryUsage> <memoryUsage limit="128 mb"/> </memoryUsage> </systemUsage> </systemUsage>

    sendFailIfNoSpaceAfterTimeout與前面那個(gè)屬性稍有不同。配置了該屬性后,在等待配置的時(shí)間后,如果broker端 依然沒(méi)有足夠的空間存儲(chǔ)消息,則會(huì)在客戶端客戶端發(fā)送消息時(shí)拋出異常。

    優(yōu)化Consumer

    為了最大限度的提升應(yīng)用程序的性能,你必須關(guān)注所有影響性能的因素。到目前為止,消息消費(fèi)者在整個(gè)ActiveMQ系統(tǒng)的性能表現(xiàn)中都扮演著舉足輕重的角色。通常,消息消費(fèi)者必須要盡量以2倍于消息生產(chǎn)者的速度運(yùn)行,因?yàn)橄M(fèi)者還要通知broker消息已經(jīng)被處理了。

    ? 通常,ActiveMQ broker 會(huì)通過(guò)消費(fèi)者連接盡可能快的發(fā)送消息。通常情況下,一旦消息通過(guò)ActiveMQ broker的Transport發(fā)送完成之后,消息就加入了與消費(fèi)者關(guān)聯(lián)的session隊(duì)列中,并等待分發(fā)。在下一節(jié)中,我們將解釋消息發(fā)送給消費(fèi)者的速度為何可控以及如何控制,同時(shí)還將闡述如何調(diào)整這個(gè)消息發(fā)送速率以獲取更好的吞吐量.

    預(yù)獲取限制(Prefetch limit)

    ActiveMQ使用一種基于推送的模式來(lái)投遞消息,將broker收到的消息投遞到consumer。為了防止消費(fèi)者耗盡內(nèi)存,有一個(gè)參數(shù)(prefetch limit)可以限制broker在等待消費(fèi)者應(yīng)答消息已被應(yīng)用程序處理之前可以發(fā)送給消費(fèi)者的消息數(shù)量。在消費(fèi)者內(nèi)部,從Transport上接收的消息會(huì)被投遞并放置于一個(gè)和消費(fèi)者 session關(guān)聯(lián)的內(nèi)部隊(duì)列中。

    消費(fèi)者連接會(huì)在內(nèi)部將分發(fā)過(guò)來(lái)的消息隊(duì)列化。這個(gè)內(nèi)部的消息隊(duì)列的尺寸加上尚未發(fā)送回執(zhí) 給broker的消息 ( 這些消息已經(jīng)被消費(fèi)者接收了但是還沒(méi)有通知broker消息已被消費(fèi) ) 的數(shù)量之和受到消費(fèi)者的prefetchlimit參數(shù)限制。通常,這個(gè)prefetchlimit參數(shù)設(shè)置的越大,消費(fèi)者運(yùn)行的越快。

    但是對(duì)于消息隊(duì)列來(lái)說(shuō),設(shè)置這個(gè)限制并非是最理想方案,因?yàn)槟憧赡芟M?huì)被平均的分發(fā)給一個(gè)隊(duì)列上的所有消費(fèi)者。這種情況下,當(dāng)prefetchlimit設(shè)置的很大時(shí),處理速度較慢的消費(fèi)者可能會(huì)累積待處理的消息。而這些消息卻不能被更快的消費(fèi)者處理。這種情況下,設(shè)置較低的prefetchlimit值可能會(huì)更適合。如果prefetchlimit值設(shè)置為0,消息消費(fèi)者會(huì)主動(dòng)從broker拉取消息并且 不允許broker推送任何消息到消費(fèi)者.

    對(duì)于不同種類的消費(fèi)者而言有不同的prefetch limit默認(rèn)值:

    • 隊(duì)列消費(fèi)者的prefetch limit默認(rèn)值為1000
    • 隊(duì)列瀏覽消費(fèi)者的prefetch limit默認(rèn)值為500
    • 持久化topic消費(fèi)者的prefetch limit默認(rèn)值為100
    • 非持久化topic的prefetch limit默認(rèn)值為32766

    prefetch limit值是消息消費(fèi)者等待接收的消息的數(shù)量而不是內(nèi)存值大小.可以通過(guò)設(shè)置 ActiveMQConnectionFactory的相關(guān)屬性值值來(lái)設(shè)置prefetch limit,如下代碼所示:

    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Properties props = new Properties(); props.setProperty("prefetchPolicy.queuePrefetch", "1000"); props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500"); props.setProperty("prefetchPolicy.durableTopicPrefetch", "100"); props.setProperty("prefetchPolicy.topicPrefetch", "32766"); cf.setProperties(props);

    或者,在創(chuàng)建一個(gè)消息destination時(shí),傳遞prefetch limit 參數(shù)作為消息destination的屬性:

    Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10"); //問(wèn)號(hào)分隔 MessageConsumer consumer = session.createConsumer(queue);

    使用Prefetch limit是一種簡(jiǎn)單的提升性能機(jī)制,但是需要謹(jǐn)慎使用。對(duì)于隊(duì)列來(lái)說(shuō),應(yīng)該考慮程序中是否有比較慢的消費(fèi)者,而對(duì)于主題來(lái)說(shuō),你需要考慮在消息被分發(fā)之前隊(duì)列 可以使用的客戶端上的最大內(nèi)存是多少.

    控制消息分發(fā)給消費(fèi)者的速率僅僅是消費(fèi)者調(diào)優(yōu)的一部分。一旦消息到達(dá)消費(fèi)者的Transport之后,消息分發(fā)到消費(fèi)者時(shí)使用的方法以及消費(fèi)者用來(lái)將消息已被處理的確認(rèn)發(fā)送給broker時(shí)使用的選項(xiàng) 就成為影響性能的重要組成部分。

    消息的投遞和確認(rèn)

    使用javax.jms.MessageListener.onMessage()來(lái)分發(fā)消息明顯比使用 javax.jms.MessageConsumer.receive()要快。如果MessageConsumer沒(méi)有設(shè)置MessageListener則該消費(fèi)者的消息會(huì)分發(fā)到隊(duì)列中然后等待調(diào)用receive()方法。不僅維護(hù)消費(fèi)者內(nèi)部隊(duì)列的代價(jià)是昂貴的,而且應(yīng)用程序線程不斷的調(diào)用receive()來(lái)切換應(yīng)用程序上下文的代價(jià)也是高昂的.

    因?yàn)锳ctiveMQ broker需要保存一個(gè)記錄以表明當(dāng)前有多少消息已被消費(fèi)來(lái)維護(hù)消費(fèi)者 內(nèi)部的prefetch limit,MessageConsumer必須為每一個(gè)消費(fèi)的消息 發(fā)送 消息確認(rèn)。如果使用了事務(wù), 當(dāng)調(diào)用Session.commit()方法是會(huì)發(fā)送 消息確認(rèn) ,但是假如使用auto-acknowledgment模式 則每個(gè)消息處理完成后都會(huì) 單獨(dú) 發(fā)送消息確認(rèn).

    有一些優(yōu)化選項(xiàng)專門用于發(fā)送消息確認(rèn)給broker,當(dāng)使用DUPS_OK_ACKNOWLEDGE session確認(rèn)模式時(shí), 這些優(yōu)化選項(xiàng)可以顯著的改善性能.另外,你可以設(shè)置ActiveMQ ConnectionFactory的optimizeAcknowledge屬性,通過(guò)給消費(fèi)者一個(gè)提示信息以便批量發(fā)送消息確認(rèn)信息.

    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setOptimizeAcknowledge(true);

    當(dāng)在一個(gè)session中使用optimizeAcknowledge或DUPS_OK_ACKNOWLEDGE確認(rèn)模式時(shí),消費(fèi)者 只發(fā)送一個(gè)消息告知ActiveMQ broker一批消息已經(jīng)完成處理.這樣消息消費(fèi)者要做的工作就減少了,便于消費(fèi)者盡可能快的處理消息.

    下面的列出了ACK的不同選項(xiàng)以及使用這些選項(xiàng)后消費(fèi)者發(fā)送ACK給ActiveMQ broker的頻率.

    ACK模式發(fā)送ACK說(shuō)明
    Session.SESSION_TRANSACTED使用 Session.commit()方法批量確認(rèn)這是消息消費(fèi)的一種可靠方式,并且性能很好,允許消息一次提交中處理多個(gè)消息.
    Session.CLIENT_ACKNOWLEDGE當(dāng)一個(gè)消息確認(rèn)了 則 所有消息都確認(rèn)在確認(rèn)之前可以使消費(fèi)者消費(fèi)大量的消息
    Session.AUTO_ACKNOWLEDG每個(gè)消息處理完成后自動(dòng)發(fā)送默認(rèn)的消息確認(rèn)機(jī)制。這種方式會(huì)比較慢,消息確認(rèn)到代理.
    Session.DUPS_OK_ACKNOWLEDGE允許一個(gè)ACK 確認(rèn)一批 消息被消費(fèi)。當(dāng)消費(fèi)者收到的消息達(dá)到prefetch limit的**50%**時(shí),即發(fā)送ACK給broker.這消息處理中最快的標(biāo)準(zhǔn)的消息確認(rèn)方式.
    ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE每處理一個(gè)消息就發(fā)送一次確認(rèn)最大限度的允許控制每個(gè)消息 單獨(dú) 被確認(rèn),但是會(huì)很慢.
    optimizeAcknowledge允許一個(gè)ACK 確認(rèn)一批 消息被消費(fèi)。與Session.AUTO_ACKNOWLEDGE一同起作用,在消費(fèi)者處理完的消息占到prefetch limit的 65% 時(shí)發(fā)送ACK.使用這種模式可以以最快的方式處理消息.

    單獨(dú)確認(rèn)每個(gè)消息的缺點(diǎn)是:不管消息消費(fèi)者以任何理由失去了與ActiveMQ broker連接,那么 消息應(yīng)用程序可能會(huì)收到重復(fù)的消息。但是,對(duì)于要求快速處理且不關(guān)心消息是否重復(fù)的 應(yīng)用程序(比如實(shí)時(shí)的數(shù)據(jù)源)來(lái)說(shuō),推薦使用optimizeAcknowledge模式.

    ActiveMQ的消息消費(fèi)者包含重復(fù)消息偵測(cè)機(jī)制,可以最大限度的降低收到重復(fù)消息的風(fēng)險(xiǎn).

    異步分發(fā)

    每個(gè)session都維護(hù)一個(gè)內(nèi)部的隊(duì)列,存儲(chǔ)即將被分發(fā)到各自的消費(fèi)者的消息。內(nèi)部消息隊(duì)列以及與之關(guān)聯(lián)的用于發(fā)送消息到消息消費(fèi)者的線程的使用,可能會(huì)給消息處理增加額外開(kāi)銷。

    可以禁用ActiveMQ連接工廠的alwaysSessionAsync屬性來(lái)停用上述消息隊(duì)列和消息分發(fā)線程.這種設(shè)置允許消息直接從Transport發(fā)送到消息消費(fèi)者。

    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setAlwaysSessionAsync(false);

    停用asynchronous允許消息直接發(fā)送到session內(nèi)部的隊(duì)列并由session負(fù)責(zé)進(jìn)一步分發(fā)

    實(shí)踐

    讓我們通過(guò)一個(gè)例子來(lái)看看如何綜合使用前面介紹的性能調(diào)優(yōu)方法.我們將用程序模擬一個(gè) 實(shí)時(shí)的數(shù)據(jù)源,該程序中消息生產(chǎn)者和一個(gè)嵌入式broker部署在一起,同時(shí)使用消息消費(fèi)者 監(jiān)聽(tīng)遠(yuǎn)程的消息.

    我們將闡如何述使用一個(gè)嵌入式broker來(lái)減少將消息發(fā)送到ActiveMQ代理的開(kāi)銷.我們還將調(diào)整 消息消費(fèi)者的一些選項(xiàng)來(lái)降低消息的拷貝.嵌入式broker將被配制成禁用流控制并且使用內(nèi)存 限制以允許broker快速處理消息流.

    最后,消息消費(fèi)者將會(huì)配置成 直接通過(guò) 分發(fā)方式,同時(shí)配置一個(gè)高prefetch limit值以及配置 優(yōu)化過(guò)的消息確認(rèn)模式.

    首先我們?cè)O(shè)置一個(gè)嵌入式broker,設(shè)置其可用內(nèi)存限制為一個(gè)合理的值(64M),為每一個(gè)消息destination設(shè)置可用內(nèi)存限制,并且停用消息生產(chǎn)者流控制。

    如下代碼所示,使用默認(rèn)的PolicyEntry設(shè)置broker的消息destination策略。PolicyEntry保存了ActiveMQ broker的消息destination的相關(guān)配置信息。可以為每一個(gè)消息destination 單獨(dú) 設(shè)置策略,也可以使用通配符將一個(gè)策略應(yīng)用到多個(gè)配置通配符的消息destination(比如,名稱為foo.>的PolicyEntry將僅應(yīng)用到名稱以foo開(kāi)頭的消息destination).在我們的例子中,我們僅僅設(shè)置內(nèi)存限制以及禁用生產(chǎn)者流控制.為了簡(jiǎn)單起見(jiàn),我們僅僅配置了默認(rèn)的PolicyEntry,該P(yáng)olicyEntry將應(yīng)用到所有消息destination.

    import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; ... //By default a broker always listens on vm://<broker name> BrokerService broker = new BrokerService(); broker.setBrokerName("fast"); broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024); //Set the Destination policies PolicyEntry policy = new PolicyEntry(); //set a memory limit of 4mb for each destination policy.setMemoryLimit(4 * 1024 *1024); //disable flow control policy.setProducerFlowControl(false); PolicyMap pMap = new PolicyMap(); //configure the policy pMap.setDefaultEntry(policy); broker.setDestinationPolicy(pMap); broker.addConnector("tcp://localhost:61616"); broker.start();

    上面代碼創(chuàng)建的代理使用了一個(gè)唯一的名稱fast,因此與broker同處于一個(gè)虛擬機(jī)內(nèi)的數(shù)據(jù)源生產(chǎn)者可以 使用VM Transport 綁定到該broker.

    除了使用了嵌入式broker,消息生產(chǎn)者也是直連的,除了將其配置成發(fā)送非持久化消息并且不使用消息拷貝。

    //tell the connection factory to connect to an embedded broker named fast. //if the embedded broker isn't already created, the connection factory will //create a default embedded broker named "fast" ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://fast"); //disable message copying cf.setCopyMessageOnSend(false); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test.topic"); final MessageProducer producer = session.createProducer(topic); //send non-persistent messages producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i =0; i < 1000000;i++) {TextMessage message = session.createTextMessage("Test:"+i);producer.send(message); }

    消息消費(fèi)者被配置成直通方式(禁用了異步session分發(fā))并使用了javax.jms.MessageListener.消息消費(fèi)者使用的消息確認(rèn)模式為optimizeAcknowledge,以便能盡可能快的處理消息

    //set up the connection factory to connect the the producer's embedded broker //using tcp:// ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover://(tcp://localhost:61616)"); //configure the factory to create connections //with straight through processing of messages //and optimized acknowledgement cf.setAlwaysSessionAsync(false); cf.setOptimizeAcknowledge(true); Connection connection = cf.createConnection(); connection.start(); //use the default session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //set the prefetch size for topics - by parsing a configuration parameter in // the name of the topic Topic topic = session.createTopic("test.topic?consumer.prefetchSize=32766"); MessageConsumer consumer = session.createConsumer(topic); //setup a counter - so we don't print every message final AtomicInteger count = new AtomicInteger(); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) {TextMessage textMessage = (TextMessage)message;try {//only print every 10,000th messageif (count.incrementAndGet()%10000==0)System.err.println("Got = " + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});

    管理和監(jiān)控ActiveMQ

    Activemq基于標(biāo)準(zhǔn)JMS API實(shí)現(xiàn)了不少功能用于管理和監(jiān)控。即可以通過(guò)編程方式,也可以通過(guò)管理工具。

    APIs

    訪問(wèn)broker的最自然的方式是通過(guò)JMX APIs。

    應(yīng)用除了消費(fèi)消息,也可以用于運(yùn)行時(shí)監(jiān)控,包括以下幾個(gè)任務(wù):

    • 獲取broker的統(tǒng)計(jì)信息
    • 新增或刪除connector。
    • 調(diào)整broker的配置。

    JMX

    從5.9開(kāi)始移除了。

    Activemq通過(guò) com.sun.management.jmxremote 啟用或禁用JMX。

    linux:

    if [ -z "$SUNJMX" ] ; then#SUNJMX="-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=-Dcom.sun.management.jmxremote.ssl=false"SUNJMX="-Dcom.sun.management.jmxremote" fi

    windows:

    if "%SUNJMX%" == "" set SUNJMX=-Dcom.sun.management.jmxremoteREM set SUNJMX=-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=-Dcom.sun.management.jmxremote.ssl=false

    Activemq配置啟用或禁用遠(yuǎn)程JMX。通過(guò) useJmx 屬性

    <broker xmlns="http://activemq.org/config/1.0" useJmx="true" brokerName="localhost" dataDirectory="${activemq.base}/data"> ... </broker>
    暴露MBean

    默認(rèn),MBean是暴露的,在配置文件中有幾個(gè)屬性用于附加功能。

    <broker xmlns="http://activemq.org/config/1.0" useJmx="true" brokerName="localhost" dataDirectory="${activemq.base}/data"><managementContext><managementContext connectorPort="2011" jmxDomainName="my-broker" /></managementContext><!-- The transport connectors ActiveMQ will listen to --><transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616" /></transportConnectors> </broker>

    useJmx:啟用或禁用JMX,默認(rèn)為true。

    默認(rèn)Activemq啟用一個(gè)connector,監(jiān)聽(tīng)端口1099,用于遠(yuǎn)程管理,并且使用域名:org.apache.activemq。可以在managementContext元素更改配置。

    使用JMX APIs
    public class Stats {public static void main(String[] args) throws Exception {JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi");JMXConnector connector = JMXConnectorFactory.connect(url, null);connector.connect();//creates a connection to the MBean serverMBeanServerConnection connection = connector.getMBeanServerConnection();ObjectName name = new ObjectName("my-broker:BrokerName=localhost,Type=Broker");// queries for the broker MBeanBrokerViewMBean mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true);·System.out.println("Statistics for broker " + mbean.getBrokerId() + " - " + mbean.getBrokerName());System.out.println("\n-----------------\n");System.out.println("Total message count: " + mbean.getTotalMessageCount() + "\n");System.out.println("Total number of consumers: " + mbean.getTotalConsumerCount());//grabs some broker statistics from the MBeanSystem.out.println("Total number of Queues: " + mbean.getQueues().length);for (ObjectName queueName : mbean.getQueues()) {QueueViewMBean queueMbean = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(connection, queueName,QueueViewMBean.class, true);System.out.println("\n-----------------\n");System.out.println("Statistics for queue " + queueMbean.getName());System.out.println("Size: " + queueMbean.getQueueSize());//grabs some queue statistics from the queue MBeansSystem.out.println("Number of consumers: " + queueMbean.getConsumerCount());}} }

    MBean通過(guò) 名稱引用,格式為:<jmx domain name>:BrokerName=<name of the broker>,Type=Broker

    ActiveMQ 的MBean的對(duì)象名稱默認(rèn)為:org.apache.activemq:BrokerName=localhost,Type=Broker

    MBean的方法:

    getTotalMessageCount():獲取消息的總數(shù)量

    getTotalConsumerCount():獲取消費(fèi)者的數(shù)量。

    getQueues().length():隊(duì)列的數(shù)量

    getQueues():所有隊(duì)列的名稱,名字與broker的 名字格式類似,例如:my-broker:BrokerName=localhost,Type=Queue,Destination=JOBS.suspend

    高級(jí) JMX 配置

    com.sun.management.jmxremote.port

    com.sun.management.jmxremote.authentication

    com.sun.management.jmxremote.ssl

    java.rmi.server.hostname

    配置JMX 用戶名密碼

    $JAVA_HOME/jre/lib/management/目錄下:

    jmxremote.access:定義角色和權(quán)限

    monitorRole readonly //角色和權(quán)限 controlRole readwrite

    jmxremote.password,jmxremote.password.template:映射角色和密碼,從template copy 一個(gè)文件,

    monitorRole QED //角色和密碼 controlRole R&D

    通知消息(Advisory Messages)

    實(shí)現(xiàn)了ActiveMQ的broker上各種操作的記錄跟蹤和通知,實(shí)時(shí)的知道broker上:

  • 創(chuàng)建或銷毀了連接,
  • 添加或刪除了生存者或消費(fèi)者,
  • 添加或刪除了主題或隊(duì)列,
  • 有消息發(fā)送和接收,
  • 什么時(shí)候有慢消費(fèi)者,
  • 什么時(shí)候有快生產(chǎn)者
  • 什么時(shí)候什么消息被丟棄
  • 什么時(shí)候broker被添加到集群(主從或是網(wǎng)絡(luò)連接)
  • 這個(gè)機(jī)制是ActiveMQ對(duì)JMS協(xié)議的重要補(bǔ)充,也是基于JMS實(shí)現(xiàn)的ActiveMQ的可管理性的一部分。多個(gè)ActiveMQ的相互協(xié)調(diào)和互操作的基礎(chǔ)設(shè)置。

    通知消息投遞到以 ActiveMQ.Advisory前綴命名的topic。

    啟用通知消息:

    <broker xmlns="http://activemq.org/config/1.0" useJmx="true" brokerName="localhost" dataDirectory="${activemq.base}/data" advisorySupport="true"> <!-- 默認(rèn)值:true,表示啟用 --><destinationPolicy><policyMap><policyEntries><policyEntry topic=">"sendAdvisoryIfNoConsumers="true"/> <!-- destination沒(méi)有消費(fèi)者則發(fā)送通知消息,每條消息都有通知消息 --></policyEntries></policyMap></destinationPolicy><!-- The transport connectors ActiveMQ will listen to --><transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616" /></transportConnectors> </broker>

    示例:

    Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination) MessageConsumer consumer = session.createConsumer(advisoryDestination); consumer.setMessageListener(this);public void onMessage(Message msg){if(msg instanceof ActiveMQMessage) {try {ActiveMQMessage aMsg = (ActiveMQMessage)msg;ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();} catch(JMSException e) {log.error("Failed to process message: " + msg);}} }

    Tools

    命令行工具

    #start $ cd apache-activemq-5.3.0 $ ./bin/activemq#start $ ./bin/activemq-admin start#stop $ ./bin/activemq-admin stop $ ./bin/activemq-admin stop --jmxurl service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi --jmxdomain my-broker#獲取所有 broker ./bin/activemq-admin list#查詢broker ./bin/activemq-admin query -QQueue=*#browse destination ./bin/activemq-admin browse --amqurl tcp://localhost:61616 JOBS.delete

    Command Agent

    JConsole

    Web Console

    Logging

    配置文件:conf/log4j.properties

    log4j.rootLogger=INFO, stdout, out log4j.logger.org.apache.activemq.spring=WARN log4j.logger.org.springframework=WARN log4j.logger.org.apache.xbean.spring=WARN

    客戶端配置:

    log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.activemq.spring=WARN log4j.logger.org.springframework=WARN log4j.logger.org.apache.xbean.spring=WARN log4j.logger.org.apache.activemq.transport.failover.FailoverTransport=DEBUG log4j.logger.org.apache.activemq.transport.TransportLogger=DEBUG

    Logging Interceptor:

    <plugins> <loggingBrokerPlugin/> </plugins>

    參考

    https://activemq.apache.org/features

    https://activemq.apache.org/jmx.html#JMX

    prefetch:https://activemq.apache.org/what-is-the-prefetch-limit-for

    通知消息:https://activemq.apache.org/advisory-message

    https://activemq.apache.org/mdc-logging

    總結(jié)

    以上是生活随笔為你收集整理的Activemq-In-action(三)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。