Activemq-In-action(三)
文章目錄
- 第三部分 Activemq應(yīng)用
- 創(chuàng)建Java應(yīng)用
- 集成broker
- 嵌入broker
- Broker Service
- Broker Factory
- 使用spring集成
- 集成broker
- 集成clients
- 定義connection factory
- 定義destination
- 定義consumer
- 定義producer
- 嵌入Activemq到其他Java容器
- 其他語(yǔ)法訪問(wèn)Activemq
- 第四部分 Activemq高級(jí)特性
- Broker拓?fù)浣Y(jié)構(gòu)
- 高可用
- Shared Nothing Master/Slave
- 配置
- 場(chǎng)景
- Shared Database Master/Slave
- Shared File system Master/Slave
- Broker之間的網(wǎng)絡(luò)
- Store and Forward(存儲(chǔ)轉(zhuǎn)發(fā))
- Network發(fā)現(xiàn)機(jī)制
- Network配置
- 應(yīng)用擴(kuò)展
- 垂直擴(kuò)展
- 水平擴(kuò)展
- 流量分區(qū)
- Activemq Broker高級(jí)特性
- Wildcards and Composite Destinations
- 訂閱Wildcard Destinations
- 發(fā)送消息到多個(gè)destination
- advisory message
- Virtual Topics
- Retroactive Consumers(可回溯消費(fèi)者)
- 消息重投遞和死信隊(duì)列
- 重新投遞策略
- 死信隊(duì)列
- 高級(jí)客戶端選項(xiàng)
- Exclusive Consumer
- Message Groups
- ActiveMQ Streams
- Blob Messages
- 性能調(diào)優(yōu)
- 通用技術(shù)
- 持久化和非持久化消息
- 事務(wù)
- 內(nèi)嵌brokers
- Tuning the OpenWire protocol
- TCP Transport優(yōu)化
- 優(yōu)化Producer
- 異步發(fā)送
- Producer Flow Control
- 優(yōu)化Consumer
- 預(yù)獲取限制(Prefetch limit)
- 消息的投遞和確認(rèn)
- 異步分發(fā)
- 實(shí)踐
- 管理和監(jiān)控ActiveMQ
- APIs
- JMX
- 暴露MBean
- 使用JMX APIs
- 高級(jí) JMX 配置
- 通知消息(Advisory Messages)
- Tools
- 命令行工具
- Command Agent
- JConsole
- Web Console
- Logging
- 參考
第三部分 Activemq應(yīng)用
創(chuàng)建Java應(yīng)用
集成broker
嵌入broker
Broker Service
可以通過(guò)類org.apache.activemq.broker.BrokerService直接啟動(dòng)一個(gè)broker,并通過(guò)它設(shè)置配置和控制周期。
public static void main(String[] args) throws Exception {BrokerService broker = new BrokerService(); //Abroker.setBrokerName("localhost");//Abroker.setDataDirectory("data/"); //ASimpleAuthenticationPlugin authentication =new SimpleAuthenticationPlugin();List<AuthenticationUser> users =new ArrayList<AuthenticationUser>();users.add(new AuthenticationUser("admin","password","admins,publishers,consumers"));users.add(new AuthenticationUser("publisher","password","publishers,consumers"));users.add(new AuthenticationUser("consumer","password","consumers"));users.add(new AuthenticationUser("guest","password","guests"));authentication.setUsers(users);broker.setPlugins(new BrokerPlugin[]{authentication}); //Bbroker.addConnector("tcp://localhost:61616"); //Cbroker.start(); //D } //A Instantiate and configure Broker Service //B Add plugins //C Add connectors //D Start brokerBroker Factory
BrokerService適用于簡(jiǎn)單場(chǎng)景,如果想要使用自定義配置,則org.apache.activemq.broker.BrokerFactory是個(gè)合適的選擇,可以通過(guò)xml配置文件啟動(dòng)一個(gè)broker。
public class Factory {public static void main(String[] args) throws Exception {System.setProperty("activemq.base", System.getProperty("user.dir"));BrokerService broker = BrokerFactory.createBroker(new URI( #A"xbean:src/main/resources/org/apache/activemq/book/ch5/activemq-simple.xml" #A)); //Abroker.start();} } //A Creating broker from XML //URI: //file:/etc/activemq/activemq.xml //broker:(tcp://localhost:61616,network:static:tcp://remotehost:61616)?persistent=false&useJmx使用spring集成
集成broker
package org.apache.activemq.book.ch6.spring; import org.apache.activemq.book.ch5.Publisher; import org.apache.xbean.spring.context.FileSystemXmlApplicationContext; public class SpringBroker {public static void main(String[] args) throws Exception {if (args.length == 0) {System.err.println("Please define a configuration file!");return;}String config = args[0]; //ASystem.out.println("Starting broker with the following configuration: " + config);System.setProperty("activemq.base", System.getProperty("user.dir")); //BFileSystemXmlApplicationContext //Ccontext = new FileSystemXmlApplicationContext(config); //CPublisher publisher = new Publisher(); //Dfor (int i = 0; i < 100; i++) { //Dpublisher.sendMessage(new String[]{"JAVA", "IONA"}); //D} //D} } //A Define configuration file //B Set base property //C Initialize application context //D Send messages集成clients
Spring集成client工作:
- 定義connection factory:
- 定義destination:
- 定義consumer:
- 定義producer:
定義connection factory
使用:org.apache.activemq.ActiveMQConnectionFactory,org.apache.activemq.pool.PooledConnectionFactory
定義destination
使用:org.apache.activemq.command.ActiveMQTopic,org.apache.activemq.command.ActiveMQQueue
定義consumer
DefaultMessageListenerContainer
定義producer
org.springframework.jms.core.JmsTemplate
嵌入Activemq到其他Java容器
Activemq不僅能嵌入Java應(yīng)用,還可以嵌入:
- 舊的Java應(yīng)用
- Apache Tomcat
- Apache Geronimo
- JBoss
- Jetty
其他語(yǔ)法訪問(wèn)Activemq
Stomp (Streaming Text Orientated Messaging Protocol),用于腳本語(yǔ)言。
略
第四部分 Activemq高級(jí)特性
Broker拓?fù)浣Y(jié)構(gòu)
高可用
3種方式配置master/slave 。
Shared Nothing Master/Slave
Master 和 Slave各自都單獨(dú)存儲(chǔ)持久化的消息,它們不共享數(shù)據(jù)。
所有的state需要同步到slave。Master收到持久化消息時(shí),需要先**同步(sync)**給Slave之后,才向Producer發(fā)送ACK確認(rèn)。
slave不啟動(dòng)任何transport和network,因此不能接收連接。只有Master負(fù)責(zé)Client的請(qǐng)求,Slave不接收Client請(qǐng)求。Slave連接到Master,負(fù)責(zé)備份消息。
Master出現(xiàn)故障,Slave有兩種處理方式:1、自己成為Master;2、關(guān)閉(停服務(wù))—根據(jù)具體配置而定。
客戶端配置failover:
failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=falseSlave 只能同步它連接到Master之后的消息。在Slave連接到Master之前Producer向Master發(fā)送的消息將不會(huì)同步給Slave,這可以通過(guò)配置(waitForSlave)參數(shù),只有當(dāng)Slave也啟動(dòng)之后,Master才開(kāi)始初始化TransportConnector接受Client的請(qǐng)求(Producer的請(qǐng)求),否則有可能丟失消息。
如果Master 或者 Slave其中之一宕機(jī),它們之間不同步的消息 無(wú)法 自動(dòng)進(jìn)行同步,此時(shí)只能手動(dòng)恢復(fù)不同步的消息了。也就是說(shuō):“ActiveMQ沒(méi)有提供任何有效的手段,能夠讓master與slave在故障恢復(fù)期間,自動(dòng)進(jìn)行數(shù)據(jù)同步”
對(duì)于非持久化消息,并不會(huì)同步給Slave。因此,Master宕機(jī),非持久化消息會(huì)丟失。
已經(jīng)運(yùn)行的broker,如果做master/slave,需要把master的數(shù)據(jù)文件copy到slave上,然后重啟maser和slave。
Master 與 Slave之間可能會(huì)出現(xiàn)“Split Brain”現(xiàn)象。比如:Master本身是正常的,但是Master與Slave之間的網(wǎng)絡(luò)出現(xiàn)故障,網(wǎng)絡(luò)故障導(dǎo)致Slave認(rèn)為Master已經(jīng)宕機(jī),因?yàn)樗约簳?huì)成為Master(根據(jù)配置:shutdownOnMasterFailure)。此時(shí),對(duì)Client而言,就會(huì)存在兩個(gè)Master。
配置
配置一個(gè)broker為slave。
<services><masterConnector remoteURI= "tcp://localhost:62001" userName="Rob" password="Davies"/> </services>| shutdownOnmasterFailure | false | master 宕機(jī)之后slave是否shutdown |
| waitForslave | false | master是否等slave啟動(dòng)之后才能接收連接 |
| shutdownOnslaveFailure | false | slave 宕機(jī)之后master是否shutdown |
場(chǎng)景
share database和share file 不能滿足的情況。
broker已運(yùn)行
Shared Database Master/Slave
這是很常用的一種架構(gòu)。“共享存儲(chǔ)”,意味著Master與Slave之間的數(shù)據(jù)是共享的。
那如何避免沖突呢?通過(guò)爭(zhēng)奪數(shù)據(jù)庫(kù)表的排他鎖,只有Master有鎖,未獲得鎖的自動(dòng)成為Slave。
對(duì)于“共享存儲(chǔ)”而言,只會(huì)“共享”持久化消息。對(duì)于非持久化消息,它們是在內(nèi)存中保存的。可以通過(guò)配置(forcePersistencyModeBrokerPlugin persistenceFlag)屬性強(qiáng)制所有的消息都持久化。
當(dāng)Master宕機(jī)后,Slave可自動(dòng)接管服務(wù)成為Master。由于數(shù)據(jù)是共享的,因此Master和Slave之間不需要進(jìn)行數(shù)據(jù)的復(fù)制與同步。Slave之間通過(guò)競(jìng)爭(zhēng)鎖來(lái)決定誰(shuí)是Master。
Shared File system Master/Slave
類似 data share。
可能是最好的解決方案。
Broker之間的網(wǎng)絡(luò)
Activemq網(wǎng)絡(luò)使用store和forward。
Store and Forward(存儲(chǔ)轉(zhuǎn)發(fā))
ActiveMQ的存儲(chǔ)和轉(zhuǎn)發(fā)概念意味著,消息在通過(guò)network轉(zhuǎn)發(fā)到其他broker之前,總是被存儲(chǔ)在本地broker中,也就是說(shuō),如果一條消息由于連接原因沒(méi)有被交付,比如說(shuō),正在重連,broker將能夠通過(guò)網(wǎng)絡(luò)連接將未交付的消息發(fā)送到遠(yuǎn)程broker。默認(rèn)情況下,network僅以單向方式操作
當(dāng)然,這并不是說(shuō)network只能單向操作,如果想要雙向操作,同樣可以在遠(yuǎn)程broker中配置一個(gè)network connector指向本地的broker,或者直接指定創(chuàng)建的network connector為雙向duplex。
當(dāng)本地broker和遠(yuǎn)程broker之間建立好一條network后,遠(yuǎn)程broker會(huì)將其所有持久和處于活動(dòng)的消費(fèi)者的目的地信息傳遞給本地broker,本地broker使用這些信息去判斷遠(yuǎn)程broker對(duì)哪種消息感興趣,并轉(zhuǎn)發(fā)該類型消息給它。
假如我們有多個(gè)超市需要連接到一個(gè)后臺(tái)辦公訂購(gòu)系統(tǒng),這將很難靈活擴(kuò)展新的超市,后臺(tái)辦公訂購(gòu)系統(tǒng)不好掌控所有新加入的超市即遠(yuǎn)程broker。注意到這里,超市broker和back office之間的network是雙向的,超市broker的配置:
<networkConnectors><networkConnector uri="static://(tcp://backoffice:61617)" name="bridge" duplex="true"conduitSubscriptions="true"decreaseNetworkConsumerPriority="false"></networkConnector> </networkConnectors>這里關(guān)于配置,主要注意一點(diǎn)是,配置的順序是很重要的,關(guān)于networks,persistence,transports的順序如下:
在大型開(kāi)發(fā)場(chǎng)景下的高可用性和network配置結(jié)合
Network發(fā)現(xiàn)機(jī)制
ActiveMQ提供兩種發(fā)現(xiàn)機(jī)制:
使用組播發(fā)現(xiàn)的方式去創(chuàng)建network連接是最簡(jiǎn)單粗暴的,當(dāng)你啟動(dòng)一個(gè)broker時(shí),它會(huì)通過(guò)組播IP去搜索其他的broker并創(chuàng)建network連接。配置方式如下:
<networkConnectors><networkConnector uri="multicast://default"/> </networkConnectors>更多見(jiàn)前面章節(jié)。
Network配置
對(duì)于遠(yuǎn)程broker現(xiàn)存在的目的地,可能沒(méi)有任何活動(dòng)持久的訂閱者或消費(fèi)者,因此,當(dāng)network初始化連接到遠(yuǎn)程broker時(shí),遠(yuǎn)程broker會(huì)讀取它現(xiàn)存目的地的消息,并傳遞給本地broker,然后,本地broker也可以轉(zhuǎn)發(fā)那些目的地的消息。
重要的是要注意,一個(gè)network將使用broker的名稱來(lái)代表遠(yuǎn)程broker創(chuàng)建唯一的持久預(yù)訂代理。 因此,如果在稍后的時(shí)間點(diǎn)更改broker的名稱,很可能會(huì)通過(guò)network丟失持久主題訂閱者的消息。 為避免這種情況,請(qǐng)確保為元素上的brokerName屬性使用唯一的名稱。 有關(guān)簡(jiǎn)要示例,請(qǐng)參閱以下內(nèi)容:
應(yīng)用擴(kuò)展
垂直擴(kuò)展
垂直擴(kuò)展是一種用于增加單個(gè)ActiveMQ broker可以處理的連接數(shù)(因此增加負(fù)載)的技術(shù)。默認(rèn)情況下,ActiveMQ broker設(shè)計(jì)為盡可能高效地移動(dòng)消息,以確保低延遲和良好的性能。但是我們可以做一些配置調(diào)整,以確保ActiveMQ broker可以處理大量的并發(fā)連接和大量的隊(duì)列。
默認(rèn)情況下,ActiveMQ將使用阻塞I/O來(lái)處理傳輸連接。 這導(dǎo)致每個(gè)連接使用一個(gè)線程。 我們可以在ActiveMQ broker上使用非阻塞I/O(而客戶端上仍然使用默認(rèn)傳輸)來(lái)減少使用的線程數(shù)。broker的非阻塞I/O配置如下:
<broker><transportConnectors><transportConnector name="nio" uri="nio://localhost:61616"/></transportConnectors> </broker>除了每個(gè)連接使用一個(gè)線程來(lái)阻塞I/O外,ActiveMQ broker可以使用線程為每個(gè)客戶端連接分派消息。可以通過(guò)將名為org.apache.activemq.UseDedicatedTaskRunner的系統(tǒng)屬性設(shè)置為false,讓ActiveMQ使用線程池。
ACTIVEMQ_OPTS="-Dorg.apache.activemq.UseDedicatedTaskRunner=false"確保ActiveMQ broker具有足夠的內(nèi)存來(lái)處理大量并發(fā)連接有兩步過(guò)程:
- 首先,需要確保啟動(dòng)ActiveMQ broker的JVM配置了足夠的內(nèi)存。
- 第二,確保專門為ActiveMQ broker在JVM配置適當(dāng)?shù)膬?nèi)存量。此調(diào)整通過(guò)< system-Usage >元素的limit屬性進(jìn)行。(最好從512MB開(kāi)始,如果測(cè)試不夠再往上加),配置示例:
還應(yīng)該降低每一個(gè)連接的CPU負(fù)載,如果使用的OpenWire連接方式,禁用緊密編碼,否則會(huì)使得CPU過(guò)度緊張。
String uri = "failover://(tcp://localhost:61616?" + " wireFormat.tightEncodingEnabled=false)"; ConnectionFactory cf = new ActiveMQConnectionFactory(uri);前面研究的是broker怎么調(diào)整去處理數(shù)千個(gè)連接,下面開(kāi)始研究的是怎么調(diào)整broker去處理數(shù)千個(gè)隊(duì)列。
默認(rèn)隊(duì)列配置使用單獨(dú)的線程來(lái)將消息從消息存儲(chǔ)區(qū)分頁(yè)到隊(duì)列中,以便分發(fā)給感興趣的消息消費(fèi)者。 對(duì)于大量隊(duì)列,建議通過(guò)為所有隊(duì)列啟用optimize-Dispatch屬性來(lái)禁用此功能,
<destinationPolicy><policyMap><policyEntries><policyEntry queue=">" optimizedDispatch="true"/></policyEntries></policyMap> </destinationPolicy>為了確保不僅可以擴(kuò)展到數(shù)千個(gè)連接,而且還可以擴(kuò)展到數(shù)萬(wàn)個(gè)隊(duì)列,使用JDBC消息存儲(chǔ)庫(kù)或更新和更快的KahaDB消息存儲(chǔ)庫(kù)。 KahaDB默認(rèn)情況下在ActiveMQ中啟用。
到目前為止,我們已經(jīng)考慮了擴(kuò)展連接,減少線程使用,并選擇正確的消息存儲(chǔ)。 調(diào)整用于擴(kuò)展的ActiveMQ的示例配置如以下:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="amq-broker" dataDirectory="${activemq.base}/data"><persistenceAdapter><kahaDB directory="${activemq.base}/data" journalMaxFileLength="32mb"/></persistenceAdapter><destinationPolicy><policyMap><policyEntries><policyEntry queue=">" optimizedDispatch="true"/></policyEntries></policyMap></destinationPolicy><systemUsage><systemUsage><memoryUsage><memoryUsage limit="512 mb"/></memoryUsage><storeUsage><storeUsage limit="10 gb" name="foo"/></storeUsage><tempUsage><tempUsage limit="1 gb"/></tempUsage></systemUsage></systemUsage><transportConnectors><transportConnector name="openwire" uri="nio://localhost:61616"/></transportConnectors> </broker>水平擴(kuò)展
除了擴(kuò)展單個(gè)broker之外,還可以使用networks來(lái)增加可用于應(yīng)用程序的ActiveMQ broker的數(shù)量。 由于networks會(huì)自動(dòng)將消息傳遞給具有感興趣的消費(fèi)者的連接broker,因此可以將客戶端配置為連接到一個(gè)broker集群,隨機(jī)選擇一個(gè)來(lái)連接。
failover://(tcp://broker1:61616,tcp://broker2:61616)?randomize=true為了確保隊(duì)列或持久主題訂閱者的消息不會(huì)在broker上孤立,需要將network配置為使用dynamicOnly和低網(wǎng)絡(luò)prefetchSize。
<networkConnector uri="static://(tcp://remotehost:61617)"name="bridge"dynamicOnly="true"prefetchSize="1"> </networkConnector>使用network進(jìn)行水平擴(kuò)展會(huì)帶來(lái)更多的延遲,因?yàn)闈撛诘南⒈仨氃诜职l(fā)給消費(fèi)者之前通過(guò)多個(gè)broker。
另一種替代部署提供了巨大的可擴(kuò)展性和性能,但需要更多的應(yīng)用規(guī)劃。 這種混合解決方案稱為流量分區(qū)。
流量分區(qū)
客戶端流量分割是垂直和水平分割的混合。 通常不使用network,因?yàn)榭蛻舳藨?yīng)用程序決定什么流量應(yīng)該到哪個(gè)broker上。 客戶端應(yīng)用程序必須維護(hù)多個(gè)JMS連接,并決定哪些JMS連接應(yīng)用于哪些目標(biāo)。
不直接使用network connection的優(yōu)點(diǎn)是,減少在brokers之間轉(zhuǎn)發(fā)消息的開(kāi)銷。 需要平衡這與導(dǎo)致典型應(yīng)用程序的額外復(fù)雜性。
Activemq Broker高級(jí)特性
Wildcards and Composite Destinations
訂閱Wildcard Destinations
Activemq支持 層級(jí)destination,每個(gè)層級(jí)名字部分用 點(diǎn)號(hào)(.)分隔,適用于topic和queue。
destination名稱保留3個(gè)特殊字符:
- .:名稱分隔符
- *:匹配一個(gè)元素
- >:匹配一個(gè)或多個(gè)后面的元素
僅支持消費(fèi)者端。
發(fā)送消息到多個(gè)destination
Activemq支持 composite destination 特性,用于把消息發(fā)送到組合的destination(queue、topic)中。
在使用時(shí),指定destination的name為逗號(hào)分隔的name列表:
store.order.backoffice,store.order.warehouse #發(fā)送到2個(gè)queue。composite destination支持topic和queue混合模式。需要對(duì)destination加上協(xié)議
queue://**** #queue默認(rèn)可以不用加 topic://**** #混合模式 store.orders, topic://store.ordersadvisory message
advisory message是系統(tǒng)消息,是broker產(chǎn)生的用于通知系統(tǒng)修改的消息。包括
- 管理對(duì)象(Connection,Destination,Consumer,Producer)加入或離開(kāi)broker
- broker達(dá)到限制(limit)
advisory message產(chǎn)生在系統(tǒng)定義的topic上,每個(gè)advisory message都有一個(gè)JMSType:Advisory,以及預(yù)定義的字符串屬性:
- originBrokerId - 產(chǎn)生Advisory消息的broker的id。
- originBrokerName -broker的名字
- originBrokerURL - broker的url
org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY
默認(rèn)一些advisories 是禁用的,
<destinationPolicy><policyMap><policyEntries><policyEntry queue=">" advisoryForSlowConsumers="true" /></policyEntries></policyMap> </destinationPolicy>Virtual Topics
如果想廣播消息到多個(gè)consumer,可以使用topic。如果想一組consumer消息一個(gè)destination,則使用queue。但是沒(méi)有合適的方式廣播消息到topic,但是一組consumer 跟queue一樣共同消費(fèi)它(每條消息僅會(huì)投遞到一個(gè)consumer)。
Virtual Topics 提供了一種簡(jiǎn)便的方式讓topic具有queue相同的語(yǔ)義。
使用Virtual topic需要遵循:
-
topic必須以 VirtualTopic. 開(kāi)頭。例如:VirtualTopic.orders
-
queue的名字格式為:Consumer.<consumer name>.VirtualTopic.<VirtualTopic Name>
Consumer.A.VirtualTopic.orders,Consumer.B.VirtualTopic.orders Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders"); MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue); Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders"); MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);//setup the sender Connection senderConnection = connectionFactory.createConnection(); senderConnection.start();Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders"); MessageProducer producer = senerSession.createProducer(ordersDestination);
Retroactive Consumers(可回溯消費(fèi)者)
為了性能,采用消息非持久化時(shí),會(huì)帶來(lái)消息不能被消費(fèi)者消費(fèi)。為了在消費(fèi)非持久化的模式下提供有限的回溯能力,Activemq提供了緩存一定size或number的消息的能力。為了實(shí)現(xiàn)此能力,需要:
- 消費(fèi)者需要告訴broker關(guān)注可回溯消息
- broker配置destination 需要緩存多少消息。
broker端需要設(shè)置RecoverPolicy。
<destinationPolicy><policyMap><policyEntries><policyEntry topic=">" ><subscriptionRecoveryPolicy ><fixedSizedSubscriptionRecoveryPolicy maximumSize = "8mb"/></subscriptionRecoveryPolicy></policyEntry></policyEntries></policyMap> </destinationPolicy>消息重投遞和死信隊(duì)列
當(dāng)消息不能重投遞或者消息過(guò)期,會(huì)被移到死信隊(duì)列中,由管理員消費(fèi)。
消息重投遞的常用場(chǎng)景:
- 事務(wù)回滾
- 事務(wù)提交前close
- A client is using CLIENT_ACKNOWLEDGE on a Session and calls recover() on that Session.
重新投遞策略
RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); policy.setMaximumRedeliveries(2);死信隊(duì)列
重新投遞 次數(shù)超過(guò) MaximumRedeliveries ,則會(huì)進(jìn)入死信隊(duì)列。
默認(rèn)情況,有一個(gè)死信隊(duì)列:AcitveMQ.DLQ,所有的消息都投遞到此隊(duì)列,包括過(guò)期消息,重投遞失敗消息。
配置個(gè)性化死信隊(duì)列。
<destinationPolicy><policyMap><policyEntries><policyEntry queue=">"><deadLetterStrategy><individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="true"processExpired="false"processNonPersistent="false"/></deadLetterStrategy></policyEntry></policyEntries></policyMap> </destinationPolicy>高級(jí)客戶端選項(xiàng)
Exclusive Consumer
當(dāng)多個(gè)消費(fèi)者消費(fèi)一個(gè)queue時(shí),不能保證消費(fèi)的順序。如果設(shè)置consumer為exclusive,則queue會(huì)選擇一組consumer中的一個(gè),把所有消息都發(fā)送給它,相對(duì)于僅有一個(gè)消費(fèi)者的好處是,選中的consumer 不可用時(shí),會(huì)再選中另一個(gè)繼續(xù)消費(fèi)。
當(dāng)消費(fèi)一個(gè)queue的consumer中既有exclusive的,又有非exclusive的,仍然會(huì)僅把消息投遞給exclusive consumer。當(dāng)所有非exclusive consumer都不可用,queue就會(huì)回到normal模式,queue會(huì)把消息投遞到所有 非exclusive consumer。
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);常見(jiàn)exclusive producer
public void start() throws JMSException { this.connection = this.factory.createConnection(); this.connection.start(); this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = this.session.createQueue(this.queueName + "?consumer.exclusive=true") Message message = this.session.createMessage(); MessageProducer producer = this.session.createProducer(destination); producer.send(message); MessageConsumer consumer = this.session.createConsumer(destination); consumer.setMessageListener(this); }Message Groups
所有message都由一個(gè)cousumer消費(fèi),通過(guò)Message header JMSXGroupID,具有相同JMSXGroupID的消息會(huì)被發(fā)送到同一個(gè)consumer,這種通用概念叫消息組。broker保證同一個(gè)消息組的消息都會(huì)發(fā)送到同一個(gè)consumer。當(dāng)consumer不可連接時(shí),消息會(huì)又都路由到另外一個(gè)consumer。
創(chuàng)建消息組
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("group.queue"); MessageProducer producer = session.createProducer(queue); Message message = session.createTextMessage(" <foo>test</foo> "); message.setStringProperty("JMSXGroupID", "TEST_GROUP_A"); producer.send(message);Activemq會(huì)為組內(nèi)的每一條消息都加一個(gè)header屬性值:JMSXGroupSeq
如果想明確的關(guān)閉一個(gè)消息組,設(shè)置JMSXGroupSeq為-1。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("group.queue"); MessageProducer producer = session.createProducer(queue); <foo></foo> Message message = session.createTextMessage(" <foo>close</foo> "); message.setStringProperty("JMSXGroupID", "TEST_GROUP_A"); message.setIntProperty("JMSXGroupSeq", -1); producer.send(message);不能假設(shè)JMSXGroupSeq都是從1開(kāi)始。當(dāng)一個(gè)消息組consumer不可用時(shí),所有路由到它的同組的消息都會(huì)路由到另一個(gè)consumer,JMSXGroupFirstForConsumer被設(shè)置為路由到consumer的是第一條信息,用于標(biāo)識(shí)consumer正在消費(fèi)另一個(gè)消息組或者一個(gè)新的消息組。
Session session = MessageConsumer consumer = session.createConsumer(queue); Message message = consumer.receive(); String groupId = message.getStringProperty("JMSXGroupId"); if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) { // do processing for new group }如果有多個(gè)consumer消費(fèi)一個(gè)queue,很有可能把消息都發(fā)送給第一個(gè)consumer,如果為了聚恒分布式消費(fèi),可以設(shè)置broker有足夠的consumer之后再分發(fā)消息。
等待消費(fèi)者滿足條件:
<destinationPolicy><policyMap><policyEntries><policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="5000"/></policyEntries></policyMap> </destinationPolicy>ActiveMQ Streams
ActiveMQ Streams是一個(gè)高級(jí)特性(需要謹(jǐn)慎使用),允許通過(guò)一個(gè)文件或socket發(fā)送數(shù)據(jù)到Activemq。
如果僅涉及一個(gè)consumer或一個(gè)queue(or exclusive consumer),工作很好,但是其他情況,消息的順序就是個(gè)很大的問(wèn)題。
Activemq使用JMS Stream的好處是把消息分成chunks,可以傳輸大量數(shù)據(jù)。
通過(guò)ActiveMQ Stream發(fā)送數(shù)據(jù):
//source of our large data FileInputStream in = new FileInputStream("largetextfile.txt"); String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); //創(chuàng)建stream OutputStream out = connection.createOutputStream(destination); //now write the file on to ActiveMQ byte[] buffer = new byte[1024]; while(true){int bytesRead = in.read(buffer);if (bytesRead==-1){break;}out.write(buffer,0,bytesRead); } //close the stream so the receiving side knows the steam is finished out.close();ActiveMQ Stream接收數(shù)據(jù):
//destination of our large data FileOutputStream out = new FileOutputStream("copied.txt"); String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //we want be be an exclusive consumer String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true"; Queue destination = session.createQueue(exclusiveQueueName); InputStream in = connection.createInputStream(destination); //now write the file from ActiveMQ byte[] buffer = new byte[1024]; while(true){int bytesRead = in.read(buffer);if (bytesRead==-1){break;}out.write(buffer,0,bytesRead); } out.close();使用exclusive consumer時(shí),確保同一個(gè)時(shí)刻僅有一個(gè)consumer讀取stream。Topic也可以使用stream,但是讀取開(kāi)始之前的數(shù)據(jù)是丟失的。
Blob Messages
Blob Message并不包含要發(fā)送的數(shù)據(jù),僅是一個(gè)通知,告知一個(gè)Blob是可用的。Blob數(shù)據(jù)本身是在外部傳輸,例如ftp或http。Blob消息包含的數(shù)據(jù)的url,已經(jīng)獲取InputStream 得到真實(shí)數(shù)據(jù)的方法。
Sending a BlobMessage
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = connectionFactory.createConnection(); connection.start(); ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(destination); BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com")); producer.send(message);處理BlobMessage:
// destination of our Blob data FileOutputStream out = new FileOutputStream("blob.txt"); String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); BlobMessage blobMessage = (BlobMessage) consumer.receive(); //獲取stream。 InputStream in = blobMessage.getInputStream(); // now write the file from ActiveMQ byte[] buffer = new byte[1024]; while (true) {int bytesRead = in.read(buffer);if (bytesRead == -1) {break;}out.write(buffer, 0, bytesRead); } out.close();性能調(diào)優(yōu)
通用技術(shù)
非持久化(不關(guān)心消息丟失)
持久化和非持久化消息
持久化消息用于減少災(zāi)難性錯(cuò)誤以及保證consumer可消費(fèi)之前的數(shù)據(jù)。
非持久化消息,僅把消息投遞給active consumer。
非持久化消息比持久化消息快的原因:
- 異步發(fā)送(不需要等待結(jié)果)
- 不存儲(chǔ)
使用持久化消息用于防范消息丟失,activemq采用:
- 重發(fā)消息
- 過(guò)濾掉重復(fù)消息
設(shè)置投遞模式
MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);事務(wù)
事務(wù)用于批量處理數(shù)據(jù)(不同于數(shù)據(jù)庫(kù)的事務(wù),不是原子性保證)
public void sendTransacted() throws JMSException { //create a default connection - we'll assume a broker is running //with its default configuration ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Connection connection = cf.createConnection(); connection.start(); //create a transacted session Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Topic topic = session.createTopic("Test.Transactions"); MessageProducer producer = session.createProducer(topic); int count =0;for (int i =0; i < 1000; i++) {Message message = session.createTextMessage("message " + i);producer.send(message);//commit every 10 messagesif (i!=0 && i%10==0){session.commit();}} }public void sendNonTransacted() throws JMSException { //create a default connection - we'll assume a broker is running //with its default configuration ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Connection connection = cf.createConnection(); connection.start(); //create a default session (no transactions) Session session = connection.createSession(false, Session.AUTO_ACKNOWELDGE); Topic topic = session.createTopic("Test.Transactions"); MessageProducer producer = session.createProducer(topic); int count =0;for (int i =0; i < 1000; i++) {Message message = session.createTextMessage("message " + i);producer.send(message);} }內(nèi)嵌brokers
消除 序列化和網(wǎng)絡(luò)傳輸,數(shù)據(jù)在同一個(gè)JVM內(nèi)。
BrokerService broker = new BrokerService(); broker.setBrokerName("service"); broker.setPersistent(false); broker.addConnector("tcp://localhost:61616"); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://service"); cf.setCopyMessageOnSend(false); Connection connection = cf.createConnection(); connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //we will need to respond to multiple destinations - so use null //as the destination this producer is bound to final MessageProducer producer = session.createProducer(null); //create a Consumer to listen for requests to service Queue queue = session.createQueue("service.queue"); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() {public void onMessage(Message msg) {try {TextMessage textMsg = (TextMessage)msg;String payload = "REPLY: " + textMsg.getText();Destination replyTo;replyTo = msg.getJMSReplyTo();textMsg.clearBody();textMsg.setText(payload);producer.send(replyTo, textMsg);} catch (JMSException e) {e.printStackTrace();}} });Connecting a QueueRequestor,用于測(cè)試內(nèi)嵌broker
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); QueueConnection connection = cf.createQueueConnection(); connection.start(); QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("service.queue"); QueueRequestor requestor = new QueueRequestor(session,queue); for(int i =0; i < 10; i++) { TextMessage msg = session.createTextMessage("test msg: " + i); TextMessage result = (TextMessage)requestor.request(msg); System.err.println("Result = " + result.getText()); }Tuning the OpenWire protocol
OpenWire Protocol使用二進(jìn)制格式傳輸命令到broker,命令包含消息、應(yīng)答等信息。性能相關(guān)的格式化參數(shù)如下:
| tcpNoDelayEnabled | false | 是否啟用tcpNoDelay。true,則在慢網(wǎng)絡(luò)發(fā)送大量小size消息能夠提高性能 |
| cacheEnabled | true | 如果啟用,則緩存重復(fù)的值(比如producerId和消息目的地),允許傳輸與緩存值對(duì)應(yīng)的簡(jiǎn)短的key.該設(shè)置減小了傳輸數(shù)據(jù)的大小,因而在網(wǎng)絡(luò)性能不佳時(shí)可以提升性能.但是因?yàn)樵诰彺嬷胁檎覕?shù)據(jù)會(huì)同時(shí)在客戶端和代理所在的機(jī)器中引入CPU負(fù)載的額外開(kāi)銷,配置時(shí)請(qǐng)考慮中引入開(kāi)銷的影響. |
| cacheSize | 1024 | 緩存條目的最大數(shù)量.該值不能超過(guò)Short.MAX_VALUE值的二分之一.當(dāng)開(kāi)啟緩存時(shí),該值設(shè)置的越大性能越好.但是因?yàn)?strong>每一個(gè)傳輸連接都獨(dú)立使用一個(gè)緩存,所以需要在代理端考慮因緩存帶來(lái)的額外開(kāi)銷,特別是當(dāng)有大量的客戶端連接到代理時(shí). |
| tightEncodingEnabled | true | 以CPU敏感的方式壓縮消息.建議在broker使用所有可用CPU時(shí)將該選項(xiàng)設(shè)置為停用. |
可以通過(guò)下面的方式,將上述參數(shù)附加到連接到代理的傳輸連接器的URI中:
//使用tightEncodingEnabled參數(shù)禁用緊湊編碼(tight encoding) String uri = "failover://(tcp://localhost:61616?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false)"; ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url); cf.setAlwaysSyncSend(true);TCP Transport優(yōu)化
ActiveMQ中使用最廣泛的連接器是TCP Transport.有兩個(gè)直接影響它的性能:
- socketBufferSize:TCP Transport用于發(fā)送和接收數(shù)據(jù)的緩沖區(qū)大小.通常該參數(shù)設(shè)置的越大越好(盡管這個(gè)最大值收到操作系統(tǒng)限制,但是可以去測(cè)試).默認(rèn)值為65536,單位是byte.
- tcpNoDelay–默認(rèn)值為false.通常TCP套接字緩存即將被發(fā)送的小尺寸數(shù)據(jù)包.當(dāng)啟用這個(gè)參數(shù)時(shí),消息會(huì)被**盡快發(fā)送(**譯注:不緩沖).同樣可以測(cè)試這個(gè)配置,因?yàn)樾薷倪@個(gè)參數(shù)是否能提升性能還和操作系統(tǒng)有關(guān).
優(yōu)化Producer
在Producer分發(fā)消息到消息消費(fèi)者之前,Producer發(fā)送消息到broker的效率是影響整個(gè)應(yīng)用程序性能的基本因素。
以下是幾個(gè)影響消息吞吐量和消息發(fā)送延遲 的調(diào)優(yōu)參數(shù).
異步發(fā)送
ActiveMQ中非持久化消息分發(fā)是可靠的,因?yàn)橄?huì)在網(wǎng)絡(luò)故障和系統(tǒng)崩潰(只要消息生產(chǎn)者依然是活動(dòng)的–此時(shí)消息生產(chǎn)者將消息緩存在失效轉(zhuǎn)移連接器的緩存中)中幸存下來(lái).但是,通過(guò)設(shè)置消息生產(chǎn)者的連接工廠的useAsyncSend屬性, 仍然 可以在使用持久化消息時(shí)獲得性能提升。
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setUseAsyncSend(true);上面代碼將設(shè)置一個(gè)屬性,告知消息生產(chǎn)者不要嘗試從broker獲取一個(gè)剛剛發(fā)送的消息的回執(zhí).
如果程序要求消息確保被投遞,則推薦使用系統(tǒng)默認(rèn)的持久化消息分發(fā)模式,并且最好同時(shí)使用事務(wù).
使用異步方式發(fā)送消息獲以取性能提升的原因不難理解,同時(shí)以這種方式實(shí)現(xiàn)性能提升的方式也很簡(jiǎn)單–只要設(shè)置ActiveMQ連接工廠的一個(gè)屬性即可.下一節(jié)將介紹ActiveMQ中一種通常不被理解的特性:消息生產(chǎn)者流控制.我們將看到大量的關(guān)于消息生產(chǎn)者效率下降或暫停問(wèn)題,并且理解 如何在應(yīng)用程序中使用流控制來(lái)減少這些問(wèn)題的發(fā)生.
Producer Flow Control
生產(chǎn)者流控制允許消息broker在系統(tǒng)資源緊張時(shí)降低消息的通過(guò)量.這種情況發(fā)生在消息消費(fèi)者處理 速度慢于生產(chǎn)者時(shí),此時(shí)消息會(huì)緩存在broker的內(nèi)存中等待被發(fā)送。消息生產(chǎn)者在收到broker通知有足夠存儲(chǔ)空間接收更多消息之前都會(huì)處于等待狀態(tài)。生產(chǎn)者流控制對(duì)于阻止broker的內(nèi)存和臨時(shí)存儲(chǔ)空間超過(guò)限制的值來(lái)說(shuō)是必要的,尤其是對(duì)于廣域網(wǎng)來(lái)說(shuō).
對(duì)于持久化消息來(lái)說(shuō),生產(chǎn)者流控制默認(rèn)時(shí)就是開(kāi)啟的,但是對(duì)于異步消息發(fā)布來(lái)說(shuō)必須明確的指定為開(kāi)啟(針對(duì)持久化消息,或?qū)τ谂渲贸煽偸钱惒桨l(fā)送消息的連接來(lái)說(shuō)).可以通過(guò)設(shè)置連接工廠的producerWindowSize屬性來(lái)開(kāi)啟消息生產(chǎn)者異步發(fā)送消息的流控制.
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setProducerWindowSize(1024000);producerWindowSize屬性用于設(shè)置生產(chǎn)者在收到broker的存儲(chǔ)空間未超過(guò)限制值回執(zhí)之前可以用來(lái)緩存消息的字節(jié)大小,超過(guò)這個(gè)設(shè)置的值,生產(chǎn)者將等待代理回執(zhí)(而不再生產(chǎn)和發(fā)送消息)。對(duì)于一個(gè)異步發(fā)送消息的生產(chǎn)者來(lái)說(shuō),如果這個(gè)屬性未開(kāi)啟,則broker仍然會(huì)暫停消息流動(dòng),這種情況下,默認(rèn)會(huì)阻塞消息生產(chǎn)者的Transport.阻塞Transport會(huì)阻塞所有使用該Transport連接的用戶,如果消息消費(fèi)者也使用同樣的連接,則會(huì)導(dǎo)致死鎖。生產(chǎn)者流控制運(yùn)行僅阻塞消息生產(chǎn)者而不是整個(gè)消息生產(chǎn)者使用的Transport。
盡管保護(hù)broker不要運(yùn)行在可用內(nèi)存空間低的狀態(tài)是一個(gè)不錯(cuò)的想法,但是這并不能改善 系統(tǒng)被最慢的消費(fèi)者拖慢時(shí)而產(chǎn)生的性能問(wèn)題。所以,讓我們看看禁用生產(chǎn)者流控制 時(shí)會(huì)發(fā)生什么,如下面代碼中的粗體字所示,你可以在broker的配置中配置消息destination的 策略來(lái)實(shí)現(xiàn).
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="10mb" /> </policyEntries> </policyMap> </destinationPolicy>默認(rèn)情況下,禁用了生產(chǎn)者流控制之后,發(fā)送給速度慢的消費(fèi)者的消息會(huì)被存放到臨時(shí)存儲(chǔ)空間中,以便消息生產(chǎn)者和其他消息消費(fèi)者可以盡可能快的運(yùn)行。如圖所示,broker可用的最大內(nèi)存限制決定了在什么時(shí)候 消息會(huì)通過(guò)追加到消息游標(biāo)的形式持久化到磁盤上。系統(tǒng)可用最大內(nèi)存限制的作用范圍是整個(gè)broker。這個(gè)限制應(yīng)該要低于消息destination可用內(nèi)存限制,以便在流控制之前起作用。(即,這個(gè)較小的值先起作用,則消息destination使用內(nèi)存不會(huì)超過(guò)配置的限制值,因?yàn)檫@個(gè)值較大)
盡管有一些因?yàn)榇鎯?chǔ)消息而導(dǎo)致broker的性能損失,在禁用了生產(chǎn)者流控制之后,消息應(yīng)用程序可以獨(dú)立于最慢的消息消費(fèi)者而運(yùn)行。在理想情況下,消息消費(fèi)者總是與消息生產(chǎn)者一樣快速運(yùn)行,這就給我們引入下一節(jié)中關(guān) 于消息消費(fèi)者的優(yōu)化.
默認(rèn)情況下,當(dāng)啟用了生產(chǎn)者流控制后,當(dāng)broker沒(méi)有空間存放更多消息時(shí),生產(chǎn)者發(fā)送消息的操作會(huì)被阻塞直到broker有足夠空間存儲(chǔ)消息。有兩種方式調(diào)整該參數(shù),使得broker獲取更多存儲(chǔ)消息空間之前,消息 生產(chǎn)者不會(huì)無(wú)限期實(shí)質(zhì)性的掛起。
第一種調(diào)節(jié)消息生產(chǎn)者流控制的方式稱為sendFailIfNoSpace
<systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="128 mb"/> </memoryUsage> </systemUsage> </systemUsage>sendFailIfNoSpace屬性將控制權(quán)返還給消息生產(chǎn)者,在代理的消息存儲(chǔ)已經(jīng)不足而生產(chǎn)者仍然嘗試發(fā)送操作時(shí),通過(guò)在生產(chǎn)者客戶端拋出異常來(lái)代替永久性的阻塞生產(chǎn)者的發(fā)送操作。這就允許生產(chǎn)者可以捕捉這個(gè)異常,然后等待 一段時(shí)間后,繼續(xù)嘗試發(fā)送操作。
第二個(gè)調(diào)節(jié)生產(chǎn)者流控制的屬性開(kāi)始于ActiveMQ的5.4.1版本.該屬性名稱為sendFailIfNoSpaceAfterTimeout:
<systemUsage> <systemUsage sendFailIfNoSpaceAfterTimeout="5000"> <memoryUsage> <memoryUsage limit="128 mb"/> </memoryUsage> </systemUsage> </systemUsage>sendFailIfNoSpaceAfterTimeout與前面那個(gè)屬性稍有不同。配置了該屬性后,在等待配置的時(shí)間后,如果broker端 依然沒(méi)有足夠的空間存儲(chǔ)消息,則會(huì)在客戶端客戶端發(fā)送消息時(shí)拋出異常。
優(yōu)化Consumer
為了最大限度的提升應(yīng)用程序的性能,你必須關(guān)注所有影響性能的因素。到目前為止,消息消費(fèi)者在整個(gè)ActiveMQ系統(tǒng)的性能表現(xiàn)中都扮演著舉足輕重的角色。通常,消息消費(fèi)者必須要盡量以2倍于消息生產(chǎn)者的速度運(yùn)行,因?yàn)橄M(fèi)者還要通知broker消息已經(jīng)被處理了。
? 通常,ActiveMQ broker 會(huì)通過(guò)消費(fèi)者連接盡可能快的發(fā)送消息。通常情況下,一旦消息通過(guò)ActiveMQ broker的Transport發(fā)送完成之后,消息就加入了與消費(fèi)者關(guān)聯(lián)的session隊(duì)列中,并等待分發(fā)。在下一節(jié)中,我們將解釋消息發(fā)送給消費(fèi)者的速度為何可控以及如何控制,同時(shí)還將闡述如何調(diào)整這個(gè)消息發(fā)送速率以獲取更好的吞吐量.
預(yù)獲取限制(Prefetch limit)
ActiveMQ使用一種基于推送的模式來(lái)投遞消息,將broker收到的消息投遞到consumer。為了防止消費(fèi)者耗盡內(nèi)存,有一個(gè)參數(shù)(prefetch limit)可以限制broker在等待消費(fèi)者應(yīng)答消息已被應(yīng)用程序處理之前可以發(fā)送給消費(fèi)者的消息數(shù)量。在消費(fèi)者內(nèi)部,從Transport上接收的消息會(huì)被投遞并放置于一個(gè)和消費(fèi)者 session關(guān)聯(lián)的內(nèi)部隊(duì)列中。
消費(fèi)者連接會(huì)在內(nèi)部將分發(fā)過(guò)來(lái)的消息隊(duì)列化。這個(gè)內(nèi)部的消息隊(duì)列的尺寸加上尚未發(fā)送回執(zhí) 給broker的消息 ( 這些消息已經(jīng)被消費(fèi)者接收了但是還沒(méi)有通知broker消息已被消費(fèi) ) 的數(shù)量之和受到消費(fèi)者的prefetchlimit參數(shù)限制。通常,這個(gè)prefetchlimit參數(shù)設(shè)置的越大,消費(fèi)者運(yùn)行的越快。
但是對(duì)于消息隊(duì)列來(lái)說(shuō),設(shè)置這個(gè)限制并非是最理想方案,因?yàn)槟憧赡芟M?huì)被平均的分發(fā)給一個(gè)隊(duì)列上的所有消費(fèi)者。這種情況下,當(dāng)prefetchlimit設(shè)置的很大時(shí),處理速度較慢的消費(fèi)者可能會(huì)累積待處理的消息。而這些消息卻不能被更快的消費(fèi)者處理。這種情況下,設(shè)置較低的prefetchlimit值可能會(huì)更適合。如果prefetchlimit值設(shè)置為0,消息消費(fèi)者會(huì)主動(dòng)從broker拉取消息并且 不允許broker推送任何消息到消費(fèi)者.
對(duì)于不同種類的消費(fèi)者而言有不同的prefetch limit默認(rèn)值:
- 隊(duì)列消費(fèi)者的prefetch limit默認(rèn)值為1000
- 隊(duì)列瀏覽消費(fèi)者的prefetch limit默認(rèn)值為500
- 持久化topic消費(fèi)者的prefetch limit默認(rèn)值為100
- 非持久化topic的prefetch limit默認(rèn)值為32766
prefetch limit值是消息消費(fèi)者等待接收的消息的數(shù)量而不是內(nèi)存值大小.可以通過(guò)設(shè)置 ActiveMQConnectionFactory的相關(guān)屬性值值來(lái)設(shè)置prefetch limit,如下代碼所示:
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Properties props = new Properties(); props.setProperty("prefetchPolicy.queuePrefetch", "1000"); props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500"); props.setProperty("prefetchPolicy.durableTopicPrefetch", "100"); props.setProperty("prefetchPolicy.topicPrefetch", "32766"); cf.setProperties(props);或者,在創(chuàng)建一個(gè)消息destination時(shí),傳遞prefetch limit 參數(shù)作為消息destination的屬性:
Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10"); //問(wèn)號(hào)分隔 MessageConsumer consumer = session.createConsumer(queue);使用Prefetch limit是一種簡(jiǎn)單的提升性能機(jī)制,但是需要謹(jǐn)慎使用。對(duì)于隊(duì)列來(lái)說(shuō),應(yīng)該考慮程序中是否有比較慢的消費(fèi)者,而對(duì)于主題來(lái)說(shuō),你需要考慮在消息被分發(fā)之前隊(duì)列 可以使用的客戶端上的最大內(nèi)存是多少.
控制消息分發(fā)給消費(fèi)者的速率僅僅是消費(fèi)者調(diào)優(yōu)的一部分。一旦消息到達(dá)消費(fèi)者的Transport之后,消息分發(fā)到消費(fèi)者時(shí)使用的方法以及消費(fèi)者用來(lái)將消息已被處理的確認(rèn)發(fā)送給broker時(shí)使用的選項(xiàng) 就成為影響性能的重要組成部分。
消息的投遞和確認(rèn)
使用javax.jms.MessageListener.onMessage()來(lái)分發(fā)消息明顯比使用 javax.jms.MessageConsumer.receive()要快。如果MessageConsumer沒(méi)有設(shè)置MessageListener則該消費(fèi)者的消息會(huì)分發(fā)到隊(duì)列中然后等待調(diào)用receive()方法。不僅維護(hù)消費(fèi)者內(nèi)部隊(duì)列的代價(jià)是昂貴的,而且應(yīng)用程序線程不斷的調(diào)用receive()來(lái)切換應(yīng)用程序上下文的代價(jià)也是高昂的.
因?yàn)锳ctiveMQ broker需要保存一個(gè)記錄以表明當(dāng)前有多少消息已被消費(fèi)來(lái)維護(hù)消費(fèi)者 內(nèi)部的prefetch limit,MessageConsumer必須為每一個(gè)消費(fèi)的消息 發(fā)送 消息確認(rèn)。如果使用了事務(wù), 當(dāng)調(diào)用Session.commit()方法是會(huì)發(fā)送 消息確認(rèn) ,但是假如使用auto-acknowledgment模式 則每個(gè)消息處理完成后都會(huì) 單獨(dú) 發(fā)送消息確認(rèn).
有一些優(yōu)化選項(xiàng)專門用于發(fā)送消息確認(rèn)給broker,當(dāng)使用DUPS_OK_ACKNOWLEDGE session確認(rèn)模式時(shí), 這些優(yōu)化選項(xiàng)可以顯著的改善性能.另外,你可以設(shè)置ActiveMQ ConnectionFactory的optimizeAcknowledge屬性,通過(guò)給消費(fèi)者一個(gè)提示信息以便批量發(fā)送消息確認(rèn)信息.
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setOptimizeAcknowledge(true);當(dāng)在一個(gè)session中使用optimizeAcknowledge或DUPS_OK_ACKNOWLEDGE確認(rèn)模式時(shí),消費(fèi)者 只發(fā)送一個(gè)消息告知ActiveMQ broker一批消息已經(jīng)完成處理.這樣消息消費(fèi)者要做的工作就減少了,便于消費(fèi)者盡可能快的處理消息.
下面的列出了ACK的不同選項(xiàng)以及使用這些選項(xiàng)后消費(fèi)者發(fā)送ACK給ActiveMQ broker的頻率.
| Session.SESSION_TRANSACTED | 使用 Session.commit()方法批量確認(rèn) | 這是消息消費(fèi)的一種可靠方式,并且性能很好,允許消息一次提交中處理多個(gè)消息. |
| Session.CLIENT_ACKNOWLEDGE | 當(dāng)一個(gè)消息確認(rèn)了 則 所有消息都確認(rèn) | 在確認(rèn)之前可以使消費(fèi)者消費(fèi)大量的消息 |
| Session.AUTO_ACKNOWLEDG | 每個(gè)消息處理完成后自動(dòng)發(fā)送 | 默認(rèn)的消息確認(rèn)機(jī)制。這種方式會(huì)比較慢,消息確認(rèn)到代理. |
| Session.DUPS_OK_ACKNOWLEDGE | 允許一個(gè)ACK 確認(rèn)一批 消息被消費(fèi)。 | 當(dāng)消費(fèi)者收到的消息達(dá)到prefetch limit的**50%**時(shí),即發(fā)送ACK給broker.這消息處理中最快的標(biāo)準(zhǔn)的消息確認(rèn)方式. |
| ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE | 每處理一個(gè)消息就發(fā)送一次確認(rèn) | 最大限度的允許控制每個(gè)消息 單獨(dú) 被確認(rèn),但是會(huì)很慢. |
| optimizeAcknowledge | 允許一個(gè)ACK 確認(rèn)一批 消息被消費(fèi)。 | 與Session.AUTO_ACKNOWLEDGE一同起作用,在消費(fèi)者處理完的消息占到prefetch limit的 65% 時(shí)發(fā)送ACK.使用這種模式可以以最快的方式處理消息. |
單獨(dú)確認(rèn)每個(gè)消息的缺點(diǎn)是:不管消息消費(fèi)者以任何理由失去了與ActiveMQ broker連接,那么 消息應(yīng)用程序可能會(huì)收到重復(fù)的消息。但是,對(duì)于要求快速處理且不關(guān)心消息是否重復(fù)的 應(yīng)用程序(比如實(shí)時(shí)的數(shù)據(jù)源)來(lái)說(shuō),推薦使用optimizeAcknowledge模式.
ActiveMQ的消息消費(fèi)者包含重復(fù)消息偵測(cè)機(jī)制,可以最大限度的降低收到重復(fù)消息的風(fēng)險(xiǎn).
異步分發(fā)
每個(gè)session都維護(hù)一個(gè)內(nèi)部的隊(duì)列,存儲(chǔ)即將被分發(fā)到各自的消費(fèi)者的消息。內(nèi)部消息隊(duì)列以及與之關(guān)聯(lián)的用于發(fā)送消息到消息消費(fèi)者的線程的使用,可能會(huì)給消息處理增加額外開(kāi)銷。
可以禁用ActiveMQ連接工廠的alwaysSessionAsync屬性來(lái)停用上述消息隊(duì)列和消息分發(fā)線程.這種設(shè)置允許消息直接從Transport發(fā)送到消息消費(fèi)者。
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setAlwaysSessionAsync(false);停用asynchronous允許消息直接發(fā)送到session內(nèi)部的隊(duì)列并由session負(fù)責(zé)進(jìn)一步分發(fā)
實(shí)踐
讓我們通過(guò)一個(gè)例子來(lái)看看如何綜合使用前面介紹的性能調(diào)優(yōu)方法.我們將用程序模擬一個(gè) 實(shí)時(shí)的數(shù)據(jù)源,該程序中消息生產(chǎn)者和一個(gè)嵌入式broker部署在一起,同時(shí)使用消息消費(fèi)者 監(jiān)聽(tīng)遠(yuǎn)程的消息.
我們將闡如何述使用一個(gè)嵌入式broker來(lái)減少將消息發(fā)送到ActiveMQ代理的開(kāi)銷.我們還將調(diào)整 消息消費(fèi)者的一些選項(xiàng)來(lái)降低消息的拷貝.嵌入式broker將被配制成禁用流控制并且使用內(nèi)存 限制以允許broker快速處理消息流.
最后,消息消費(fèi)者將會(huì)配置成 直接通過(guò) 分發(fā)方式,同時(shí)配置一個(gè)高prefetch limit值以及配置 優(yōu)化過(guò)的消息確認(rèn)模式.
首先我們?cè)O(shè)置一個(gè)嵌入式broker,設(shè)置其可用內(nèi)存限制為一個(gè)合理的值(64M),為每一個(gè)消息destination設(shè)置可用內(nèi)存限制,并且停用消息生產(chǎn)者流控制。
如下代碼所示,使用默認(rèn)的PolicyEntry設(shè)置broker的消息destination策略。PolicyEntry保存了ActiveMQ broker的消息destination的相關(guān)配置信息。可以為每一個(gè)消息destination 單獨(dú) 設(shè)置策略,也可以使用通配符將一個(gè)策略應(yīng)用到多個(gè)配置通配符的消息destination(比如,名稱為foo.>的PolicyEntry將僅應(yīng)用到名稱以foo開(kāi)頭的消息destination).在我們的例子中,我們僅僅設(shè)置內(nèi)存限制以及禁用生產(chǎn)者流控制.為了簡(jiǎn)單起見(jiàn),我們僅僅配置了默認(rèn)的PolicyEntry,該P(yáng)olicyEntry將應(yīng)用到所有消息destination.
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; ... //By default a broker always listens on vm://<broker name> BrokerService broker = new BrokerService(); broker.setBrokerName("fast"); broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024); //Set the Destination policies PolicyEntry policy = new PolicyEntry(); //set a memory limit of 4mb for each destination policy.setMemoryLimit(4 * 1024 *1024); //disable flow control policy.setProducerFlowControl(false); PolicyMap pMap = new PolicyMap(); //configure the policy pMap.setDefaultEntry(policy); broker.setDestinationPolicy(pMap); broker.addConnector("tcp://localhost:61616"); broker.start();上面代碼創(chuàng)建的代理使用了一個(gè)唯一的名稱fast,因此與broker同處于一個(gè)虛擬機(jī)內(nèi)的數(shù)據(jù)源生產(chǎn)者可以 使用VM Transport 綁定到該broker.
除了使用了嵌入式broker,消息生產(chǎn)者也是直連的,除了將其配置成發(fā)送非持久化消息并且不使用消息拷貝。
//tell the connection factory to connect to an embedded broker named fast. //if the embedded broker isn't already created, the connection factory will //create a default embedded broker named "fast" ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://fast"); //disable message copying cf.setCopyMessageOnSend(false); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test.topic"); final MessageProducer producer = session.createProducer(topic); //send non-persistent messages producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i =0; i < 1000000;i++) {TextMessage message = session.createTextMessage("Test:"+i);producer.send(message); }消息消費(fèi)者被配置成直通方式(禁用了異步session分發(fā))并使用了javax.jms.MessageListener.消息消費(fèi)者使用的消息確認(rèn)模式為optimizeAcknowledge,以便能盡可能快的處理消息
//set up the connection factory to connect the the producer's embedded broker //using tcp:// ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover://(tcp://localhost:61616)"); //configure the factory to create connections //with straight through processing of messages //and optimized acknowledgement cf.setAlwaysSessionAsync(false); cf.setOptimizeAcknowledge(true); Connection connection = cf.createConnection(); connection.start(); //use the default session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //set the prefetch size for topics - by parsing a configuration parameter in // the name of the topic Topic topic = session.createTopic("test.topic?consumer.prefetchSize=32766"); MessageConsumer consumer = session.createConsumer(topic); //setup a counter - so we don't print every message final AtomicInteger count = new AtomicInteger(); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) {TextMessage textMessage = (TextMessage)message;try {//only print every 10,000th messageif (count.incrementAndGet()%10000==0)System.err.println("Got = " + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});管理和監(jiān)控ActiveMQ
Activemq基于標(biāo)準(zhǔn)JMS API實(shí)現(xiàn)了不少功能用于管理和監(jiān)控。即可以通過(guò)編程方式,也可以通過(guò)管理工具。
APIs
訪問(wèn)broker的最自然的方式是通過(guò)JMX APIs。
應(yīng)用除了消費(fèi)消息,也可以用于運(yùn)行時(shí)監(jiān)控,包括以下幾個(gè)任務(wù):
- 獲取broker的統(tǒng)計(jì)信息
- 新增或刪除connector。
- 調(diào)整broker的配置。
JMX
從5.9開(kāi)始移除了。
Activemq通過(guò) com.sun.management.jmxremote 啟用或禁用JMX。
linux:
if [ -z "$SUNJMX" ] ; then#SUNJMX="-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=-Dcom.sun.management.jmxremote.ssl=false"SUNJMX="-Dcom.sun.management.jmxremote" fiwindows:
if "%SUNJMX%" == "" set SUNJMX=-Dcom.sun.management.jmxremoteREM set SUNJMX=-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=-Dcom.sun.management.jmxremote.ssl=falseActivemq配置啟用或禁用遠(yuǎn)程JMX。通過(guò) useJmx 屬性
<broker xmlns="http://activemq.org/config/1.0" useJmx="true" brokerName="localhost" dataDirectory="${activemq.base}/data"> ... </broker>暴露MBean
默認(rèn),MBean是暴露的,在配置文件中有幾個(gè)屬性用于附加功能。
<broker xmlns="http://activemq.org/config/1.0" useJmx="true" brokerName="localhost" dataDirectory="${activemq.base}/data"><managementContext><managementContext connectorPort="2011" jmxDomainName="my-broker" /></managementContext><!-- The transport connectors ActiveMQ will listen to --><transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616" /></transportConnectors> </broker>useJmx:啟用或禁用JMX,默認(rèn)為true。
默認(rèn)Activemq啟用一個(gè)connector,監(jiān)聽(tīng)端口1099,用于遠(yuǎn)程管理,并且使用域名:org.apache.activemq。可以在managementContext元素更改配置。
使用JMX APIs
public class Stats {public static void main(String[] args) throws Exception {JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi");JMXConnector connector = JMXConnectorFactory.connect(url, null);connector.connect();//creates a connection to the MBean serverMBeanServerConnection connection = connector.getMBeanServerConnection();ObjectName name = new ObjectName("my-broker:BrokerName=localhost,Type=Broker");// queries for the broker MBeanBrokerViewMBean mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true);·System.out.println("Statistics for broker " + mbean.getBrokerId() + " - " + mbean.getBrokerName());System.out.println("\n-----------------\n");System.out.println("Total message count: " + mbean.getTotalMessageCount() + "\n");System.out.println("Total number of consumers: " + mbean.getTotalConsumerCount());//grabs some broker statistics from the MBeanSystem.out.println("Total number of Queues: " + mbean.getQueues().length);for (ObjectName queueName : mbean.getQueues()) {QueueViewMBean queueMbean = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(connection, queueName,QueueViewMBean.class, true);System.out.println("\n-----------------\n");System.out.println("Statistics for queue " + queueMbean.getName());System.out.println("Size: " + queueMbean.getQueueSize());//grabs some queue statistics from the queue MBeansSystem.out.println("Number of consumers: " + queueMbean.getConsumerCount());}} }MBean通過(guò) 名稱引用,格式為:<jmx domain name>:BrokerName=<name of the broker>,Type=Broker
ActiveMQ 的MBean的對(duì)象名稱默認(rèn)為:org.apache.activemq:BrokerName=localhost,Type=Broker
MBean的方法:
getTotalMessageCount():獲取消息的總數(shù)量
getTotalConsumerCount():獲取消費(fèi)者的數(shù)量。
getQueues().length():隊(duì)列的數(shù)量
getQueues():所有隊(duì)列的名稱,名字與broker的 名字格式類似,例如:my-broker:BrokerName=localhost,Type=Queue,Destination=JOBS.suspend
高級(jí) JMX 配置
com.sun.management.jmxremote.port
com.sun.management.jmxremote.authentication
com.sun.management.jmxremote.ssl
java.rmi.server.hostname
配置JMX 用戶名密碼
$JAVA_HOME/jre/lib/management/目錄下:
jmxremote.access:定義角色和權(quán)限
monitorRole readonly //角色和權(quán)限 controlRole readwritejmxremote.password,jmxremote.password.template:映射角色和密碼,從template copy 一個(gè)文件,
monitorRole QED //角色和密碼 controlRole R&D通知消息(Advisory Messages)
實(shí)現(xiàn)了ActiveMQ的broker上各種操作的記錄跟蹤和通知,實(shí)時(shí)的知道broker上:
這個(gè)機(jī)制是ActiveMQ對(duì)JMS協(xié)議的重要補(bǔ)充,也是基于JMS實(shí)現(xiàn)的ActiveMQ的可管理性的一部分。多個(gè)ActiveMQ的相互協(xié)調(diào)和互操作的基礎(chǔ)設(shè)置。
通知消息投遞到以 ActiveMQ.Advisory前綴命名的topic。
啟用通知消息:
<broker xmlns="http://activemq.org/config/1.0" useJmx="true" brokerName="localhost" dataDirectory="${activemq.base}/data" advisorySupport="true"> <!-- 默認(rèn)值:true,表示啟用 --><destinationPolicy><policyMap><policyEntries><policyEntry topic=">"sendAdvisoryIfNoConsumers="true"/> <!-- destination沒(méi)有消費(fèi)者則發(fā)送通知消息,每條消息都有通知消息 --></policyEntries></policyMap></destinationPolicy><!-- The transport connectors ActiveMQ will listen to --><transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616" /></transportConnectors> </broker>示例:
Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination) MessageConsumer consumer = session.createConsumer(advisoryDestination); consumer.setMessageListener(this);public void onMessage(Message msg){if(msg instanceof ActiveMQMessage) {try {ActiveMQMessage aMsg = (ActiveMQMessage)msg;ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();} catch(JMSException e) {log.error("Failed to process message: " + msg);}} }Tools
命令行工具
#start $ cd apache-activemq-5.3.0 $ ./bin/activemq#start $ ./bin/activemq-admin start#stop $ ./bin/activemq-admin stop $ ./bin/activemq-admin stop --jmxurl service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi --jmxdomain my-broker#獲取所有 broker ./bin/activemq-admin list#查詢broker ./bin/activemq-admin query -QQueue=*#browse destination ./bin/activemq-admin browse --amqurl tcp://localhost:61616 JOBS.deleteCommand Agent
JConsole
Web Console
Logging
配置文件:conf/log4j.properties
log4j.rootLogger=INFO, stdout, out log4j.logger.org.apache.activemq.spring=WARN log4j.logger.org.springframework=WARN log4j.logger.org.apache.xbean.spring=WARN客戶端配置:
log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.activemq.spring=WARN log4j.logger.org.springframework=WARN log4j.logger.org.apache.xbean.spring=WARN log4j.logger.org.apache.activemq.transport.failover.FailoverTransport=DEBUG log4j.logger.org.apache.activemq.transport.TransportLogger=DEBUGLogging Interceptor:
<plugins> <loggingBrokerPlugin/> </plugins>參考
https://activemq.apache.org/features
https://activemq.apache.org/jmx.html#JMX
prefetch:https://activemq.apache.org/what-is-the-prefetch-limit-for
通知消息:https://activemq.apache.org/advisory-message
https://activemq.apache.org/mdc-logging
總結(jié)
以上是生活随笔為你收集整理的Activemq-In-action(三)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Activemq-In-action(二
- 下一篇: JMX笔记整理