日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Activemq-In-action(二)

發布時間:2024/4/13 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Activemq-In-action(二) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 第一部分 消息系統和Activemq
    • JMS
      • JMS概念
      • JMS消息
        • Message消息頭(Header)
        • Message消息體(Body)
        • **Message消息屬性**
  • 第二部分 配置Activemq
    • 第4章 Connecting to ActiveMQ
      • Connector URIs
      • Configuring Transport Connectors
        • Using Network Protocols
          • TCP
          • NIO (New I/O API Protocol)
          • UDP
          • SSL
          • HTTP/HTTPS
          • Virtual Machine Protocol
      • Configuring Network Connectors
        • Defining Static Networks
          • static protocol
          • Failover Protocol
        • Defining Dynamic networks
          • Multicast Protocol
          • Discovery Protocol
          • Peer Protocol
          • Fanout Protoco
    • 第5章 消息持久化
      • 消息存儲
      • 消息存儲實現
        • AMQ
          • AMQ存儲內核
          • AMQ存儲目錄結構
          • AMQ配置
          • AMQ使用場景
        • KahaDB
          • KahaDB配置
          • KahaDB使用場景
        • JDBC
          • JDBC Schema
          • JDBC配置
          • JDBC使用場景
          • 與ActiveMQ Journal一起使用
        • Memory
          • Memory配置
      • Caching Messages in the Broker for Consumers
        • 消息緩存工作原理
        • Subscription Recovery Policies(訂閱恢復策略)
          • Fixed Size Subscription Recovery Policy(固定大小)
          • Fixed Count Subscription Recovery Policy
          • Query Based Subscription Recovery Policy
          • Timed Subscription Recovery Policy
          • Last Image Subscription Recovery Policy
          • No Subscription Recovery Policy
          • RetainedMessageSubscriptionRecoveryPolicy
    • Activemq的安全體系
      • Basic Security Concepts
      • Authentication
        • 配置Simple Authentication Plugin
        • 配置JAAS Plugin
      • Authorization
        • Operation Level Authorization
        • Message Level Authorization
        • Broker Level Operations
        • 自定義Plugin
      • 參考

第一部分 消息系統和Activemq

JMS

JMS概念

  • JMS Client:純JAVA 發送和接收消息 的客戶端
  • Non-JMS Client:使用JMS Provider的本地API 代替JMS 發送和接收消息的客戶端。
  • JMS Producer
  • JMS Consumer
  • JMS Provider :純JAVA實現JMS接口 的應用
  • JMS Message
  • JMS Domains:消息系統的2種模式,包括點對點、發布/訂閱
  • Administered Objects
    ? Connection Factory
    ? Destination
public interface MessageProducer {void setDisableMessageID(boolean value) throws JMSException;boolean getDisableMessageID() throws JMSException;void setDisableMessageTimestamp(boolean value) throws JMSException;boolean getDisableMessageTimestamp() throws JMSException;void setDeliveryMode(int deliveryMode) throws JMSException;int getDeliveryMode() throws JMSException;void setPriority(int defaultPriority) throws JMSException;int getPriority() throws JMSException;void setTimeToLive(long timeToLive) throws JMSException;long getTimeToLive() throws JMSException;Destination getDestination() throws JMSException;void close() throws JMSException;void send(Message message) throws JMSException;void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException;void send(Destination destination, Message message) throws JMSException;void send( Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException; } public interface MessageConsumer {String getMessageSelector() throws JMSException;MessageListener getMessageListener() throws JMSException;void setMessageListener(MessageListener listener) throws JMSException;Message receive() throws JMSException;Message receive(long timeout) throws JMSException;Message receiveNoWait() throws JMSException;void close() throws JMSException; }

JMS消息

一個消息有三個主要部分:

  • 消息頭(Header,必須):包含用于識別和為消息尋找路由的操作設置,所有類型的這部分格式都是一樣的。
  • 消息體(Body,可選、一個):指我們具體需要消息傳輸的內容,允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。
  • 消息屬性(Properties,可選、一組):按類型可以分為應用設置的屬性,標準屬性和消息中間件定義的屬性,包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過濾器(消息選擇器)。

Message消息頭(Header)

屬性名稱說明設置者
JMSDestination消息發送的目的地,是一個Topic或Queuesend
JMSDeliveryMode消息的發送模式,分為NON_PERSISTENT和PERSISTENT,即持久化的和非持久化的send
JMSMessageID消息ID,需要以ID:開頭send
JMSTimestamp消息發送時的時間,也可以理解為調用send()方法時的時間,而不是該消息發送完成的時間send
JMSCorrelationID關聯的消息ID,這個通常用在需要回傳消息的時候client
JMSReplyTo消息回復的目的地,其值為一個Topic或Queue, 這個由發送者設置,但是接收者可以決定是否響應client
JMSRedelivered消息是否重復發送過,如果該消息之前發送過,那么這個屬性的值需要被設置為true, 客戶端可以根據這個屬性的值來確認這個消息是否重復發送過,以避免重復處理。Provider
JMSType由消息發送者設置的個消息類型,代表消息的結構,有的消息中間件可能會用到這個,但這個并不是是批消息的種類,比如TextMessage之類的client
JMSExpiration消息的過期時間,以毫秒為單位,根據定義,它應該是timeToLive的值再加上發送時的GMT時間,也就是說這個指的是過期時間,而不是有效期send
JMSPriority消息的優先級,0-4為普通的優化級,而5-9為高優先級,通常情況下,高優化級的消息需要優先發送send

系統提供的標準頭信息一共有10個屬性,其中有6個是由send方法在調用時設置的,有三個是由客戶端設置的,還有一個是由消息中間件設置的。

注意:這里的client不是指消費者,而是指使用JMS的客戶端,即開發者所寫的應用程序,即在生產消息時,這三個屬性是可以由應用程序來設定的,而其它的header要么由消息中間件設置,要么由發送方法來決定,開發者即使設置了,也是無效的。

Message消息體(Body)

注意:發送和接受的消息體類型必須保持一一對應。

五中消息體格式

序號消息體類型說明
1TextMessage(文本消息)編碼字符串。對于外發消息,字符串在由目標對象給定的字符集中進行編碼。缺省情況下使用 UTF8 編碼(UTF8 編碼從消息的第一個字符開始;開頭處無長度字段)。但是,可以指定 用于 JMS 的 IBM? MQ 類 支持的任何其他字符集。此類字符集主要在將消息發送到非 JMS 應用程序時使用。
如果字符集是雙字節集(包括 UTF16),那么目標對象的整數編碼規范可確定字節順序。
使用消息本身中指定的字符集及編碼來解釋入局消息。這些規范在最后一個 IBM MQ 頭中,如果沒有頭,那么在 MQMD 中。對于 JMS 消息,最后一個頭通常為 MQRFH2。
2MapMessage(鍵值對消息)消息體包含了一系列的名字-值對.名字是Strings,而值則是Java primitive 類型.消息體中的條目可以被enumerator按照順序訪問,也可以自由訪問.條目的順序沒 有定義.
3ObjectMessage(對象消息)是 Java? Runtime 以正常方式進行序列化的對象。
4BytesMessage(bytes消息)缺省情況下,BytesMessage 是 JMS 1.0.2 規范及關聯 Java 文檔所定義的一系列字節。
對于由應用程序本身組合的外發消息,目標對象的編碼屬性可用于覆蓋消息中所含的整數和浮點字段的編碼。例如,可以請求以 S/390 格式而非 IEEE 格式存儲浮點值。
使用消息本身中指定的數字編碼來解釋入局消息。此規范在最后一個 IBM MQ 頭中,如果沒有頭,那么在 MQMD 中。對于 JMS 消息,最后一個頭通常為 MQRFH2。
如果收到 BytesMessage,并且在不進行修改的情況下重新發送,那么消息主體將按照其接收的方式逐字節進行傳輸。目標對象的編碼屬性對主體無任何影響。可以在 BytesMessage 中明確發送的唯一的類似字符串的實體是 UTF8 字符串。它采用 Java UTF8 格式編碼,并以雙字節長度字段開頭。目標對象的字符集屬性對外發 BytesMessage 編碼無任何影響。入局 IBM MQ 消息中的字符集值對于將此消息解釋為 JMS BytesMessage 無任何影響。
非 Java 應用程序不太可能能夠識別 Java UTF8 編碼。因此,對于要發送包含文本數據的 BytesMessage 的 JMS 應用程序,應用程序本身必須將其字符串轉換為字節數組,并將這些字節數組寫入 BytesMessage。
5StreamMessage(流消息)StreamMessage 與映射消息類似,但無元素名稱

Message消息屬性

屬性說明
JMSXUserID發送消息的用戶識別,發送時提供商設置
JMSXappID發送消息的應用標識,發送時提供商設置
JMSXdeliveryCount轉發消息重試次數:從1開始,發送方提供商設置
JMSXGroupID消息所在消息組的用戶標識,由客戶端設置
JMSXGroupSeq組內消息的序號,從1開始.由客戶端設置
JMSXProducerTEID產生消息的事務的事務表示,發送方提供商設置
JMSConsumerTXID消費消息的事務的事務表示,接收方提供設置
JMSXRevTimestampJMS轉發消息到消費者的事件,接收方提供設置
JMState假設有個消息倉庫,它存儲每個消息的單獨拷貝,從原始消息被發送時開始,狀態有1(等待),2(準備),3(到期),4(保留),由于狀態和生產者和消費者無關,所以它不是由他們提供,它只和倉庫查找消息相關,因此JMS沒有提供這中API,由提供商設置

第二部分 配置Activemq

第4章 Connecting to ActiveMQ

ActiveMQ提供connectors連接機制用于客戶端連接brokers,可用于broker-to-broker。客戶端可以使用多種協議連接broker。

Connector URIs

ActiveMQ使用 low-level connectors進行連接。

格式:

schema://path?query #exsample tcp://localhost:61614?trace=true

Configuring Transport Connectors

配置文件:conf/activemq.xml

<!-- The transport connectors ActiveMQ will listen to,name必須唯一 --> <transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616"discoveryUri="multicast://default"/><transportConnector name="ssl" uri="ssl://localhost:61617"/><transportConnector name="stomp" uri="stomp://localhost:61613"/><transportConnector name="xmpp" uri="xmpp://localhost:61222"/> </transportConnectors>

可以通過以下方式創建連接

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

生產者

Publisher publisher = new Publisher(args[0]); String[] topics = new String[args.length - 1]; publisher.sendMessage(topics);

消費者

Consumer consumer = new Consumer(args[0]); String[] topics = new String[args.length - 1]; for (String stock : topics) {Destination destination = consumer.getSession().createTopic("STOCKS." + stock);MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);messageConsumer.setMessageListener(new Listener()); }}

Using Network Protocols

TCP

消息序列化為字節序列,通過 wire protocol來定義,activemq默認使用的 wire protocol 是 OpenWire。

TCP transport connector 用于轉換 message 序列化為 OpenWire 格式并通過 TCP network。

#格式 tcp://hostname:port?key=value&key=value #默認端口 61616

配置:

<transportConnectors><transportConnectorname="tcp"uri="tcp://localhost:61616?trace=true" /> </transportConnectors>

options:

trace:記錄所有的操作

優點

  • 高效
  • 可用性:常用協議
  • 可靠性:
NIO (New I/O API Protocol)

格式:

nio://hostname:port?key=value

配置:

<transportConnectors><transportConnectorname="tcp"uri="tcp://localhost:61616?trace=true" /><transportConnectorname="nio"uri="nio://localhost:61618?trace=true" /> </transportConnectors>
UDP

格式:

udp://hostname:port?key=value

配置:

<transportConnectors><transportConnectorname="tcp"uri="tcp://localhost:61616?trace=true"/><transportConnectorname="udp"uri="udp://localhost:61618?trace=true"/> </transportConnectors>
SSL

格式:

ssl://hostname:port?key=value

配置:

<transportConnectors><transportConnectorname="tcp"uri="tcp://localhost:61616?trace=true"/><transportConnectorname="ssl"uri="ssl://localhost:61617?trace=true"/> </transportConnectors>

SSL協議需要設置certificate。

JSEE支持2種文件格式用于存儲keys和certificates。第一種 keystores,存儲私有數據,憑據。受信certificate存儲在truststores。

默認存儲位置:${ACTIVEMQ_HOME}/conf/, broker.ks 是默認broker certificate的keystore。broker.ts 是默認truststores。

使用SSL

//系統屬性 javax.net.ssl.keyStore //定義一個keystore javax.net.ssl.keyStorePassword //定義keystore password javax.net.ssl.trustStore // 定義 truststoremvn \ -Djavax.net.ssl.keyStore=${ACTIVEMQ_HOME}/conf/client.ks \ -Djavax.net.ssl.keyStorePassword=password \ -Djavax.net.ssl.trustStore=${ACTIVEMQ_HOME}/conf/client.ts \ exec:java -Dexec.mainClass=org.apache.activemq.book.ch3.Publisher \ -Dexec.args="ssl://localhost:61617 CSCO ORCL"

keytool工具可用于生成keystore和certificate

keytool -genkey -alias broker -keyalg RSA -keystore mybroker.kskeytool -export -alias broker -keystore mybroker.ks -file mybroker_certkeytool -genkey -alias client -keyalg RSA -keystore myclient.ks
HTTP/HTTPS

格式:

http://hostname:port?key=value https://hostname:port?key=value

配置:

<transportConnectors><transportConnectorname="tcp"uri="tcp://localhost:61616?trace=true"/><transportConnectorname="http"uri="http://localhost:8080?trace=true"/> </transportConnectors>

客戶端使用http協議,必須加入以下jar包

$ACTIVEMQ_HOME/lib/optional/activemq-optional-<version>.jar $ACTIVEMQ_HOME/lib/optional/commons-httpclient-<version>.jar $ACTIVEMQ_HOME/lib/optional/xstream-<version>.jar $ACTIVEMQ_HOME/lib/optional/xmlpull-<version>.jar

maven 依賴:

<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-optional</artifactId><version>5.2.0</version> </dependency>
Virtual Machine Protocol

使用場景:activemq 嵌入一個java 應用中。不通過網絡訪問,只通過方法調用

格式:

vm://brokerName?key=value #brokerName必須唯一,可以創建多個 #示例 vm://broker1?marshal=false&broker.persistent=false#配置額外的connector vm:broker:(transportURI,network:networkURI)/brokerName?key=value #vm:broker:(tcp://localhost:6000)?brokerName=embeddedbroker&persistent=false

Configuring Network Connectors

Network Connectors 用于配置 brokers 之間的網絡拓撲結構。

<!-- The store and forward broker networks ActiveMQ will listen to --> <networkConnectors> <!-- by default just auto discover the other brokers --><networkConnector name="default-nc" uri="multicast://default"/> <!-- <networkConnector name="host1 and host2"uri="static://(tcp://host1:61616,tcp://host2:61616)"/> --> </networkConnectors>

discovery: 指發現其他broker的過程。

Defining Static Networks

配置靜態網絡,前提是知道所有broker的uri。

static protocol

格式:

static:(uri1,uri2,uri3,...)?key=value

配置:

<networkConnectors><networkConnector name="local network"uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/> </networkConnectors>

Failover Protocol

故障轉移

格式:

failover:(uri1,...,uriN)?key=value 或者 failover:uri1,...,uriN

Defining Dynamic networks

Multicast Protocol

廣播協議。Group address: 224.0.0.0 to 239.255.255.255,

配置:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="multicast" dataDirectory="${activemq.base}/data"><networkConnectors><networkConnector name="default-nc" uri="multicast://default"/></networkConnectors><transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/></transportConnectors> </broker>

缺點:

  • 自動發現,容易加入不應該加入的broker
  • 建議謹慎使用

阻止自動broker發現:

當在一個團隊環境,使用ActiveMq的默認配置,有可能導致 一個Activemq實例去消費另一個實例的消息,有一些建議用于避免此場景:

移除Openwire transport connector的discoveryUri選項,此選項讓其他broker發現自己。

<transportConnector name="openwire" uri="tcp://localhost:61616" />

移除 default-nc network connector,此選項用于 發現其他 broker。

<!--networkConnector name="default-nc" uri="multicast://default"/-->

為broker確定唯一的名稱:默認為localhost,

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1234" dataDirectory="${activemq.base}/data">
Discovery Protocol

discovery transport connector 是客戶端 的廣播功能,行為上類似failover協議,區別是可以通過廣播發現broker,并隨機選擇一個連接。

格式:

discovery:(discoveryAgentURI)?key=value
Peer Protocol

前述方式連接內嵌broker是比較笨重的,Activemq提供了 Peer Protocol可以很容易連接內嵌brokers。應用內的消費者和生產者訪問內嵌的broker,不同application內的brokers之間可以通過network串聯。

格式:

peer://peergroup/brokerName?key=value

Fanout Protoco

此協議允許客戶端同時連接多個broker,并且復制操作到這些broker。

格式:

fanout:(fanoutURI)?key=value

fanoutURI,可以是static URI 或 multicast URI

第5章 消息持久化

Activemq不僅支持JMS規范的數據傳遞持久化或非持久化模式,還支持數據恢復。

Activemq支持插件策略支持消息存儲,以及為in-memory方式提供存儲選項:文件模式關系型數據庫模式

一旦消息被消費者消費,并且確認Q,就會被刪除。

消息存儲

queue和topic的存儲模式不完全相同,涉及到point-to-point 和publish/subscribe的相關領域

queue采用FIFO模式直接存儲,消息只會發給一個consumer,并且消費確認只會,直接刪除。

topic模式,每個consumer都可以獲取message的副本,一條消息只會存儲在一個broker上。訂閱者只能接收訂閱之后的消息。

? topic存儲

每個消息存儲實現都必須支持queue和topic 的持久化。

消息存儲實現

  • AMQ:為了性能和可用性的默認實現。
  • KahaDB:實現可擴展性和可恢復。
  • JDBC:

AMQ

消息存儲的默認實現,基于文件系統的事務性存儲,經過性能調校,用于快速消息存儲。目標:易于使用以及盡可能快。

在 activemq.xml 中使用 持久化適配器 來配置。

... <broker persistent="true" xmlns="http://activemq.apache.org/schema/core"> ...<persistenceAdapter><amqPersistenceAdapter/></persistenceAdapter> ... </broker> ...

嵌入Activemq,可以采用以下方式:

public class EmbeddedBrokerUsingAMQStoreExample {public void main(String[] args) throws Exception {BrokerService broker = new BrokerService();PersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();·persistenceFactory.setMaxFileLength(1024*16);persistenceFactory.setPersistentIndex(true);persistenceFactory.setCleanupInterval(10000);broker.setPersistenceFactory(persistenceFactory);?broker.addConnector("tcp://localhost:61616");1broker.start();} }
AMQ存儲內核

AMQ是最快的實現。基于高優化的消息id索引以及內存緩存。

AMQ的主要組件:

  • The Journal:

    由消息的滾動日志、命令(事務邊界和消息刪除)組成,存儲在確定長度的文件中。當文件長度達到指定大小,會創建一個新的文件。

    數據文件中的所有消息都有引用計數,一旦數據文件的每條消息都不再被引用,則會被刪除并且歸檔。消息僅會被追加在當前數據文件的末尾,因此速度最快。

  • The Cache

    一旦消息寫入journal,則被緩存,緩存持續更新消息id的引用以及消息在journal中的位置。這個過程類似checkpoint,一旦引用存儲被更新,則消息能夠安全的從緩存中移除。通過checkpoinInterval配置,或者當緩存達到限制也會主動觸發。

  • The Reference Store

    journal中消息的引用存儲以及消息id的索引存儲。可以指定索引儲存方式,一種是hash文件,一種是hashmap內存(有數據限制)

AMQ存儲目錄結構

Activemq啟動時會為每個broker創建一個子目錄,所以要保證broker的名字唯一。

  • data directory

    包含索引和消息引用,如果broker未正常關閉,則在恢復時會刪除目錄并重建。可以通過在啟動broker前刪除目錄觸發強制恢復。

  • state directory

    topic消費者的信息,Journal本身并不保存這些信息,當恢復時會獲取這些信息重建數據庫。

  • lock file

    保證同一時刻僅有一個broker訪問此數據。通常用于 stand-by模式。

  • tmp-storage directory

    存儲非持久化消息。

  • kr-store

    AMQ的referrence(index),默認使用Kaha數據庫,

  • journal

    journal的數據文件和 一個data-control 文件(元數據信息),數據文件通過引用計數,如果所有信息都已消費,則可以刪除和歸檔。

  • achive

    僅當 archiving 被啟用才創建。journal的數據文件不會被刪除,而是移動到此處。當重新放到一個新的journal目錄時,會自動重放消息。

AMQ配置
<?xml version="1.0" encoding="UTF-8"?> <beans><broker xmlns="http://activemq.apache.org/schema/core"><persistenceAdapter><amqPersistenceAdapterdirectory="target/Broker2-data/activemq-data"syncOnWrite="true"indexPageSize="16kb"indexMaxBinSize="100"maxFileLength="10mb" /></persistenceAdapter></broker> </beans>

options:

屬性名稱默認值注釋
directoryactivemq-data數據及日志存儲目錄
useNIOtrueuse NIO to write messages to the data logs
syncOnWritefalse同步寫磁盤
maxFileLength32mb文件最大長度
persistentIndextrue是否持久化index
maxCheckpointMessageAddSize4kbthe maximum number of messages to keep in a transaction before automatically committing
cleanupInterval30000time (ms) before checking for a discarding/moving message data logs that are no longer used
indexBinSize1024default number of bins used by the index. The bigger the bin size - the better the relative performance of the index
indexKeySize96the size of the index key - the key is the message id
indexPageSize16kbthe size of the index page - the bigger the page - the better the write performance of the index
directoryArchivearchivethe path to the directory to use to store discarded data logs
archiveDataLogsfalseif true data logs are moved to the archive directory instead of being deleted
AMQ使用場景

AMQ是默認存儲,在易用性和性能之間做了平衡。數據存儲嵌入了broker中。它是為可靠性持久化的事務型journal的組合,組合了高性能的索引。

AMQ的可配置性讓activemq適用于大部分場景,從高吞吐量應用到大數據量應用。

KahaDB

KahaDB用于突破 AMQ的限制。AMQ為每個index(每個destination都有一個index)使用2個分開的文件,如果Activemq非正常關閉,則可能恢復緩慢。原因是需要broker拷貝所以消息日志重建索引。

KahaDB為所有index適用同一個事物日志文件,并且所有的destination都使用同一個index。

KahaDB的組件很類似AMQ:

  • A cache
  • Reference Indexes
  • A message journal

所有index的更新都記錄在一個log文件中。KahaDB使用B-Tree存儲結構。

KahaDB配置
<broker brokerName="broker" persistent="true" useShutdownHook="false"> ...<persistenceAdapter><kahaDB directory="activemq-data" journalMaxFileLength="32mb"/></persistenceAdapter> ... </broker>

屬性:

屬性名稱默認值注釋
archiveCorruptedIndexfalseIf true, 啟動時歸檔損壞的index(非刪除)
archiveDataLogsfalseIf true, 從data log 移動消息到 歸檔目錄,而不是刪除
checkForCorruptJournalFilesfalseIf true, 啟動時檢查損壞journal日志,并嘗試修復。
checkpointInterval5000Time (ms) before check-pointing the journal.
checksumJournalFilestrueCreate a checksum for a journal file. The presence of a checksum is required in order for the persistence adapter to be able to detect corrupt journal files. Before ActiveMQ 5.9.0: the default is false.
cleanupInterval30000The interval (in ms) between consecutive checks that determine which journal files, if any, are eligible for removal from the message store. An eligible journal file is one that has no outstanding references.
compactAcksAfterNoGC10From ActiveMQ 5.14.0: when the acknowledgement compaction feature is enabled this value controls how many store GC cycles must be completed with no other files being cleaned up before the compaction logic is triggered to possibly compact older acknowledgements spread across journal files into a new log file. The lower the value set the faster the compaction may occur which can impact performance if it runs to often.
compactAcksIgnoresStoreGrowthfalseFrom ActiveMQ 5.14.0: when the acknowledgement compaction feature is enabled this value controls whether compaction is run when the store is still growing or if it should only occur when the store has stopped growing (either due to idle or store limits reached). If enabled the compaction runs regardless of the store still having room or being active which can decrease overall performance but reclaim space faster.
concurrentStoreAndDispatchQueuestrueEnable the dispatching of Queue messages to interested clients to happen concurrently with message storage.
concurrentStoreAndDispatchTopicsfalseEnable the dispatching of Topic messages to interested clients to happen concurrently with message storage. Enabling this property is not recommended.
directoryactivemq-dataThe path to the directory to use to store the message store data and log files.
directoryArchivenullDefine the directory to move data logs to when they all the messages they contain have been consumed.
enableAckCompactiontrueFrom ActiveMQ 5.14.0: this setting controls whether the store will perform periodic compaction of older journal log files that contain only Message acknowledgements. By compacting these older acknowledgements into new journal log files the older files can be removed freeing space and allowing the message store to continue to operate without hitting store size limits.
enableIndexWriteAsyncfalseIf true, the index is updated asynchronously.
enableJournalDiskSyncstrueEnsure every journal write is followed by a disk sync (JMS durability requirement). This property is deprecated as of ActiveMQ 5.14.0. From ActiveMQ 5.14.0: see journalDiskSyncStrategy.
ignoreMissingJournalfilesfalseIf true, reports of missing journal files are ignored.
indexCacheSize10000Number of index pages cached in memory.
indexDirectoryFrom ActiveMQ 5.10.0: If set, configures where the KahaDB index files (db.data and db.redo) will be stored. If not set, the index files are stored in the directory specified by the directory attribute.
indexWriteBatchSize1000Number of indexes written in a batch.
journalDiskSyncInterval1000Interval (ms) for when to perform a disk sync when journalDiskSyncStrategy=periodic. A sync will only be performed if a write has occurred to the journal since the last disk sync or when the journal rolls over to a new journal file.
journalDiskSyncStrategyalwaysFrom ActiveMQ 5.14.0: this setting configures the disk sync policy. The list of available sync strategies are (in order of decreasing safety, and increasing performance): always Ensure every journal write is followed by a disk sync (JMS durability requirement). This is the safest option but is also the slowest because it requires a sync after every message write. This is equivalent to the deprecated property enableJournalDiskSyncs=true. periodic The disk will be synced at set intervals (if a write has occurred) instead of after every journal write which will reduce the load on the disk and should improve throughput. The disk will also be synced when rolling over to a new journal file. The default interval is 1 second. The default interval offers very good performance, whilst being safer than never disk syncing, as data loss is limited to a maximum of 1 second’s worth. See journalDiskSyncInterval to change the frequency of disk syncs. never A sync will never be explicitly called and it will be up to the operating system to flush to disk. This is equivalent to setting the deprecated property enableJournalDiskSyncs=false. This is the fastest option but is the least safe as there’s no guarantee as to when data is flushed to disk. Consequently message loss can occur on broker failure.
journalMaxFileLength32mbA hint to set the maximum size of the message data logs.
maxAsyncJobs10000The maximum number of asynchronous messages that will be queued awaiting storage (should be the same as the number of concurrent MessageProducers).
preallocationScopeentire_journalFrom ActiveMQ 5.14.0: this setting configures how journal data files are preallocated. The default strategy preallocates the journal file on first use using the appender thread. entire_journal_async will use preallocate ahead of time in a separate thread. none disables preallocation. On SSD, using entire_journal_async avoids delaying writes pending preallocation on first use. Note: on HDD the additional thread contention for disk has a negative impact. Therefore use the default.
preallocationStrategysparse_fileFrom ActiveMQ 5.12.0: This setting configures how the broker will try to preallocate the journal files when a new journal file is needed. sparse_file - sets the file length, but does not populate it with any data. os_kernel_copy - delegates the preallocation to the Operating System. zeros - each preallocated journal file contains nothing but 0x00 throughout.
storeOpenWireVersion11Determines the version of OpenWire commands that are marshaled to the KahaDB journal. Before ActiveMQ 5.12.0: the default value is 6. Some features of the broker depend on information stored in the OpenWire commands from newer protocol revisions and these may not work correctly if the store version is set to a lower value. KahaDB stores from broker versions greater than 5.9.0 will in many cases still be readable by the broker but will cause the broker to continue using the older store version meaning newer features may not work as intended. For KahaDB stores that were created in versions prior to ActiveMQ 5.9.0 it will be necessary to manually set storeOpenWireVersion="6" in order to start a broker without error.
KahaDB使用場景

使用AMQ的場景基本上都可以使用KahaDB,高性能需求或者一個broker內的destination小于等于500,則AMQ更好。

KahaDB不兼容AMQ,也沒有工具可轉換。僅在新的broker啟動時可以選擇哪個。

JDBC

默認JDBC driver是Apache Derby。但其他關系型數據庫仍然支持。

支持的數據庫(未列舉完):

  • Apache Derby

  • MySQL

  • PostgreSQL

  • Oracle

  • SQLServer

  • Sybase

  • Informix

  • MaxDB

JDBC Schema

消息存儲表(ACTIVEMQ_MSGS)

消息確認表(ACTIVEMQ_ACKS)

JDBC配置
<beans><broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core"><persistenceAdapter><jdbcPersistenceAdapter dataDirectory="activemq-data" dataSource="#mysql-ds" /></persistenceAdapter></broker> </beans> <?xml version="1.0" encoding="UTF-8"?> <beans><broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core"><persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysql-ds"/></persistenceAdapter></broker><bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/><property name="username" value="activemq"/><property name="password" value="activemq"/><property name="maxActive" value="200"/><property name="poolPreparedStatements" value="true"/></bean> </beans>
JDBC使用場景

master/slave模式。

與ActiveMQ Journal一起使用

Memory

Memory配置
<?xml version="1.0" encoding="UTF-8"?> <beans><broker brokerName="test-broker"persistent="false" xmlns="http://activemq.apache.org/schema/core"><transportConnectors><transportConnector uri="tcp://localhost:61635"/></transportConnectors></broker> </beans>

嵌入式Activemq配置

import org.apache.activemq.broker.BrokerService; public void createEmbeddedBroker() throws Exception {BrokerService broker = new BrokerService();//configure the broker to use the Memory Storebroker.setPersistent(false);//Add a transport connectorbroker.addConnector("tcp://localhost:61616");//now start the brokerbroker.start(); }

Caching Messages in the Broker for Consumers

消息緩存用于

消息緩存工作原理

Activemq為每個topic都緩存消息在內存中,不支持臨時topic和通知topic。由于queue的通用操作時hold發送的每條信息,因此消息緩存不適用。

僅當消息分發給topic消費者時,并且消息是可追溯的,永不面向持久topic訂閱者,消息才會緩存。

topic消費者通過設置屬性consumer.retroactive來標記是個可追溯的消費。

import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Topic; public void createRetroactiveConsumer() throws JMSException{ConnectionFactory fac = new ActiveMQConnectionFactory();Connection connection = fac.createConnection();connection.start();Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("TEST.TOPIC?consumer.retroactive=true");MessageConsumer consumer = session.createConsumer(topic); }

在broker端,消息緩存由Destination Policy(subscriptionRecoveryPolicy.)控制,默認策略:FixedSizedSubscriptionRecoveryPolicy.

Subscription Recovery Policies(訂閱恢復策略)

生產者在向某個topic發送了多條消息后,這個時候非持久訂閱者才訂閱,那么它是不能獲取之前生產者發送的信息的。或者,由于網絡問題,非持久類型的消費者處于非活躍狀態,無法接收到生產者發送的消息。使用消息恢復策略,可以解決上面的問題。ActiveMQ目前支持一個定時或固定大小的恢復緩沖區,在你連接到broker后,在一段時間內的消息會重新發送給訂閱者。

Fixed Size Subscription Recovery Policy(固定大小)

可以保留固定字節的消息。

<policyEntry topic=">"><subscriptionRecoveryPolicy><fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/></subscriptionRecoveryPolicy> </policyEntry> 屬性默認值描述
maximumSize6553600最大內存(B)
useSharedBuffertrue所有topic共享
Fixed Count Subscription Recovery Policy

保留固定數量的消息。

<policyEntry topic=">"><subscriptionRecoveryPolicy><fixedCountSubscriptionRecoveryPolicy maximumSize="100"/></subscriptionRecoveryPolicy> </policyEntry> 屬性默認值描述
maximumSize100消息數量
Query Based Subscription Recovery Policy

根據查詢機制使用回溯。

屬性默認值描述
querynull通過屬性查詢
<policyEntry topic=">"><subscriptionRecoveryPolicy><queryBasedSubscriptionRecoveryPolicy query="Color='red' AND Name='tom'"/></subscriptionRecoveryPolicy> </policyEntry>
Timed Subscription Recovery Policy

保留指定時間內的消息

<policyEntry topic=">"><subscriptionRecoveryPolicy><timedSubscriptionRecoveryPolicy recoverDuration="60000"/></subscriptionRecoveryPolicy> </policyEntry> 屬性默認值描述
recoverDuration60000最大保留時間(ms)
Last Image Subscription Recovery Policy

保留最后一條記錄

<policyEntry topic=">"><subscriptionRecoveryPolicy><lastImageSubscriptionRecoveryPolicy/></subscriptionRecoveryPolicy> </policyEntry>
No Subscription Recovery Policy

禁用回溯,這是默認配置。

<policyEntry topic=">"><subscriptionRecoveryPolicy><noSubscriptionRecoveryPolicy/></subscriptionRecoveryPolicy> </policyEntry>
RetainedMessageSubscriptionRecoveryPolicy

保留ActiveMQ.Retain屬性值為true的最后1條消息。

注意:需要設置retroactive屬性為true

<policyEntry topic=">"><subscriptionRecoveryPolicy><retainedMessageSubscriptionRecoveryPolicy/></subscriptionRecoveryPolicy> </policyEntry>

Activemq的安全體系

Basic Security Concepts

Authentication

所有安全概念在Activemq中都是以插件方式實現。Activemq有2種認證插件:

  • Simple authentication plugin:在xml直接配置
  • JAAS authentication plugin

配置Simple Authentication Plugin

<broker xmlns="http://activemq.org/config/1.0" brokerName="localhost" dataDirectory="${activemq.base}/data"><transportConnectors><transportConnector name="openwire"uri="tcp://localhost:61616" /></transportConnectors><plugins><simpleAuthenticationPlugin><users><authenticationUserusername="admin"password="password"groups="admins,publishers,consumers"/><authenticationUserusername="publisher"password="password"groups="publishers,consumers"/><authenticationUserusername="consumer"password="password"groups="consumers"/><authenticationUserusername="guest"password="password"groups="guests"/></users></simpleAuthenticationPlugin></plugins> </broker>

使用憑據

private String username = "publisher"; private String password = "password"; public Publisher() throws JMSException {factory = new ActiveMQConnectionFactory(brokerURL);//connection = factory.createConnection(username, password);connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);producer = session.createProducer(null); }

配置JAAS Plugin

Authorization

Operation Level Authorization

3種操作級別:

  • read
  • write
  • admin
... <plugins><jaasAuthenticationPlugin configuration="activemq-domain" /><authorizationPlugin><map><authorizationMap><authorizationEntries><authorizationEntry topic=">"read="admins"write="admins"admin="admins" /><authorizationEntry topic="STOCKS.>"read="consumers"write="publishers"admin="publishers" />#<authorizationEntry topic="STOCKS.ORCL"read="guests" /><authorizationEntry topic="ActiveMQ.Advisory.>"read="admins,publishers,consumers,guests"write="admins,publishers,consumers,guests"admin="admins,publishers,consumers,guests" /></authorizationEntries></authorizationMap></map></authorizationPlugin> </plugins> ...

Message Level Authorization

控制在消息級別

實現一個MessageAuthorizationPolicy類

public class AuthorizationPolicy implements MessageAuthorizationPolicy {private static final Log LOG = LogFactory.getLog(AuthorizationPolicy.class);public boolean isAllowedToConsume (ConnectionContext context, Message message) {LOG.info(context.getConnection().getRemoteAddress());if (context.getConnection().getRemoteAddress().startsWith("/127.0.0.1")) {return true;} else {return false;}} }

配置使用此類

<messageAuthorizationPolicy> <bean class="org.apache.activemq.book.ch5.AuthorizationPolicy" xmlns="" /> </messageAuthorizationPolicy>

Broker Level Operations

自定義Plugin

參考

JAAS:http://java.sun.com/products/jaas/reference/docs/index.html

Login Module:http://java.sun.com/javase/6/docs/api/javax/security/auth/spi/LoginModule.html

discovery:http://activemq.apache.org/discovery-transport-reference.html

peer:http://activemq.apache.org/peer-transport-reference.html

fanout:http://activemq.apache.org/fanout-transport-reference.html

存儲:https://activemq.apache.org/persistence

amq:https://activemq.apache.org/amq-message-store

JDBC:https://activemq.apache.org/jdbc-support

TCP :http://activemq.apache.org/tcp-transport-reference.html

總結

以上是生活随笔為你收集整理的Activemq-In-action(二)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。