Sping+ActiveMQ整合
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/spring-activemq-quick-start/
通過前一篇《ActiveMQ簡述》大概對ActiveMQ有了一個大概的認識,本篇所闡述的是如何通過Spring繼承ActiveMQ進而更有效、更靈活的運用ActiveMQ.
Spring和ActiveMQ整合需要在項目中包含以下這幾個jar包(缺一不可):activeio-core-3.1.4.jar,activemq-all-5.13.2.jar,activemq-pool-5.13.2.jar,commons-pool2-2.4.2.jar,這些jar可以在ActiveMQ的安裝包中的/lib/optional/下找到,或者從這里下載。
##配置ConnectionFactory
ConnectionFactory是用于產生到JMS服務器的鏈接的,Spring為我們提供了多個ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory對于建立JMS服務器鏈接的請求會一直返回同一個鏈接,并且會忽略Connection的close方法調用。CachingConnectionFactory繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了緩存功能,可以緩存Session, MessageProducer和MessageConsumer。
Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正產生到JMS服務器鏈接的ConnectionFactory還得是JMS服務廠商提供的,并且需要把它注入到Spring提供的ConnectionFactory中,這里使用ActiveMQ提供的ConnectionFactory,所以定義如下(10.10.195.187是博主的測試ip地址):
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://10.10.195.187:61616" /></bean><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactory"/></bean>ActiveMQ為我們提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory可以用來將Connection, Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。當使用PooledConnectionFactory時,我們在定義一個ConnectionFactory時這樣定義:
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://10.10.195.187:61616" /></bean><bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" ><property name="connectionFactory" ref="targetConnectionFactory" /><property name="maxConnections" value="10"/></bean><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="poolConnectionFactory"/></bean>這里也可以去掉spring提供的SingleConnectionFactory,類似這樣:
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://10.10.195.187:61616" /></bean><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" ><property name="connectionFactory" ref="targetConnectionFactory" /><property name="maxConnections" value="10"/></bean>##配置消息發送(生產)者
配置好ConnectionFactory之后我們就需要配置生產者。生產者負責生產消息并發送到JMS服務器,這通常對應的是我們的一個業務邏輯服務實現類。但是我們的服務實現類是怎么進行消息的發送的呢?這通常是利用Spring為我們提供的JmsTemplate類來實現,所以配置生產者其實最核心的就是配置進行消息發送的JmsTemplate。對于消息發送者而言,它在發送消息的時候要知道自己該往哪里發,為此,我們在定義JmsTemplate的時候需要往里面注入一個Spring提供的ConnectionFactory對象。
spring配置文件中添加:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="poolConnectionFactory"/></bean>在Java相關處理文件中添加(這里用的是@Inject注解,當然也可以用@Autowired):
@Inject@Named("jmsTemplate")private JmsTemplate jmsTemplate;這樣就可以通過jmsTemplate對象來處理相關信息,譬如:
jmsTemplate.convertAndSend("sqlQueue",sql);真正利用JmsTemplate進行消息發送的時候,我們需要知道消息發送的目的地,即Destination。在Jms中有一個用來表示目的地的Destination接口,它里面沒有任何方法定義,只是用來做一個標志而已。當我們在使用JmsTemplate進行消息發送時沒有指定destination的時候將使用默認的Destination。默認Destination可以通過在定義jmsTemplate bean對象時通過屬性defaultDestination或defaultDestinationName來進行諸如,defaultDestinationName對于的就是一個普通字符串。在ActiveMQ中實現了兩種類型的Destination,一個是點對點的ActiveMQQueue,另一個就是支持訂閱-發布模式的ActiveMQTopic。
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>sqlQueue</value></constructor-arg></bean><bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg><value>topic</value></constructor-arg></bean>對面上面的那個例子可以在Java程序中添加:
@Inject@Named("queueDestination")private Destination queueDestination;進而【jmsTemplate.convertAndSend(“sqlQueue”,sql);】可以改為【jmsTemplate.convertAndSend(queueDestination,sql);】
也可以這樣修改jmsTemplate:
或者:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"/><property name="defaultDestinationName" value="sqlQueue"/></bean>進而在java程序中這樣調用:jmsTemplate.convertAndSend(sql);
##配置消息接收(消費)者
生產者往指定目的地Destination發送消息后,接下來就是消費者對指定目的地的消息進行消費了。那么消費者是如何知道有生產者發送消息到指定目的地Destination了呢?這是通過Spring為我們封裝的消息監聽容器MessageListenerContainer實現的,它負責接收信息,并把接收到的信息分發給真正的MessageListener進行處理。每個消費者對應每個目的地都需要有對應的MessageListenerContainer。對于消息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪里去監聽,也就是說它還需要知道去監聽那個JMS服務器,這是通過在配置MessageConnectionFactory的時候往里面注入一個ConnectionFactory來實現的。所以我們在配置一個MessageListenerContainer的時候有三個屬性必須指定,一個是表示從哪里監聽的ConnectionFactory,一個是表示監聽什么的Destination,一個是接收到消息以后進行消息處理的MessageListener.
Spring為我們聽過了兩種類型的MessageListenerContainer:SimpleMessageListenerContainer和DefaultMessageListenerContainer。MessageListenerContainer:SimpleMessageListenerContainer會在一開始的時候就創建一個會話Session和消費者Consumer,并且會適用標準的JMS的MessageConsumer.setMessageListener()方法注冊監聽器讓JMS提供調用監聽器的回調函數。它不會動態的適應運行時需要和參與外部的事務管理。兼容性方面,它非常接近于獨立的JMS規范,但一般不兼容J2EE的JMS限制。大多數情況下,我們還是使用DefaultMessageListenerContainer,跟MessageListenerContainer:SimpleMessageListenerContainer想比,它會動態的適應運行時的需求,并且能夠參與外部的事務管理。它很好的平衡了JMS提供者要求低,先進功能如事務參與和兼容J2EE環境。
spring配置文件中添加:
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>sqlQueue</value></constructor-arg></bean><bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="poolConnectionFactory" /><property name="messageListener" ref="jmsQueueReceiver" /><property name="destination" ref="queueDestination" /></bean>其中的jmsQueueReceiver代碼如下:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;import org.springframework.stereotype.Component;@Component("jmsQueueReceiver") public class JmsQueueReceiver implements MessageListener {@Overridepublic void onMessage(Message messgae){if(messgae instanceof TextMessage){TextMessage textMessage = (TextMessage) messgae;try{String text = textMessage.getText();System.out.println("######################["+text+"]######################");}catch (JMSException e){e.printStackTrace();}}} }##事務管理
Spring提供了一個JmsTransactionManager用于對JMS ConnectionFactory做事務管理。這將允許JMS應用利用Spring的事務管理特性。JmsTransactionManager在執行本地資源事務管理時將從指定的ConnectionFactory綁定一個ConnectionFactory/Session這樣的配對到線程中。JmsTemplate會自動檢測這樣的事務資源,并對他們進行相應的操作。
在J2EE環境中,ConnectionFactory會池化Connection和Session,這樣這些資源將會在整個事務中被有效地重復利用。在一個獨立的環境中,使用Spring的SingleConnectionFactory時所有的事務將公用一個Connection,但是每個事務將保留自己獨立的Session.
JmsTemplate可以利用JtaTransactionManager和能夠進行分布式的JMS ConnectionFactory處理分布式事務。
在Spring整合JMS的應用中,如果我們要進行本地的事務管理的話非常簡單,只需要在定義對于的消息監聽容器時指定其sessionTransacted屬性為true(默認為false):
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="messageListener" ref="jmsQueueReceiver" /><property name="destination" ref="queueDestination" /><property name="sessionTransacted" value="true"/></bean>這樣JMS在進行消息監聽的時候就會進行事務控制,當在接收消息時監聽器執行失敗時JMS就會對接收到的消息進行回滾。這里需要注意的是對于其他操作如數據庫等訪問不屬于該事務控制。
可以在JmsQueueReceiver中修改一下代碼從而檢測到發送異常時是否會進行事務回滾:
@Component("jmsQueueReceiver") public class JmsQueueReceiver implements MessageListener {@Overridepublic void onMessage(Message messgae){if(messgae instanceof TextMessage){TextMessage textMessage = (TextMessage) messgae;try{String text = textMessage.getText();System.out.println("######################["+text+"]######################");if(true){throw new RuntimeException("Error");}}catch (JMSException e){e.printStackTrace();}}} }你可以通過向Destination發送一條信息,通過JmsQueueReceiver處理后,發送異常進而事務混滾。可以通過http://10.10.195.187:8161/admin/queues.jsp查看相關信息(ip地址是博主的ActiveMQ服務器所在id,根據實際情況修改)。
如果想要接收消息和數據庫訪問處于同一事務中,那么我們就可以配置一個外部的事務管理同時配置一個支持外部事務管理的消息監聽容器(如DefaultMessageListenerContainer)。要配置這樣一個參與分布式事務管理的消息監聽容器,我們可以配置一個JtaTransactionManager,當然底層的JMS ConnectionFactory需要能夠支持分布式事務管理,并正確地注冊我們的JtaTransactionManager。這樣消息監聽器進行消息接收和對應的數據庫訪問就會處于同一數據庫控制下,當消息接收失敗或數據庫訪問失敗都會進行事務回滾操作。
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="messageListener" ref="jmsQueueReceiver" /><property name="destination" ref="queueDestination" /><property name="sessionTransacted" value="true"/><property name="transactionManager" ref="jtaTransactionManager"/></bean><bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>當給小時監聽容器指定了transactionManager時,消息監聽容器將忽略sessionTransacted的值。
到這里大概的Spring+ActiveMQ整合告一段落,所有代碼經博主親測,確保有效性及正確性。如有疑問,可在下方留言。
參考資料:
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/spring-activemq-quick-start/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Sping+ActiveMQ整合的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ActiveMQ简述
- 下一篇: 聊一聊ThreadLocal