日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

activeMQ使用总结

發布時間:2025/3/19 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 activeMQ使用总结 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這幾天在看異步消息,想到了jms,想到了activeMQ。下面是使用記錄:

activeMQ可以直接官網上下到。

理解:activeMQ實現了一個隊列,提供了tcp、socket等多種連接方式,通過java 的JMS進行發送消息和監聽消息。

? ? ? ? ? ? 消息分為兩種:queue(隊列)、topic(主題),隊列是點對點的生產者、消費者模式,主題是發布、訂閱模式。

開發與學習過程:

1、網上查找activeMQ和JMS相關資料,大致了解了上面理解的內容。

2、快速的進行開發,集成spring,因為集成spring把監聽、隊列等jms組件都定義好,更加方便學習。

spring集成jms組件包括:

?jmsTemplate:

<!-- 同步和異步消息jmsTemplate -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
</bean>
<bean id="asyJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="asyConnectionFactory" />
<property name="receiveTimeout" value="10000" />
</bean>
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="asyConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" />
</bean>

?隊列:

<!-- 消息回復隊列 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="response_queue"/></bean>

監聽:

<!-- 回復消息監聽器 -->
<bean id="responseQueueListener" class="utry.jms.listener.ResponseQueueListener"/>
<!-- 回復對應的監聽容器 -->
<bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="responseQueue"/>
<property name="messageListener" ref="responseQueueListener"/>
</bean>


發送:

public void sendMsg(String destinationName, Serializable obj,
boolean asyncSend) {
if (asyncSend) {
asyJmsTemplate.convertAndSend(destinationName, obj);
} else {
jmsTemplate.convertAndSend(destinationName, obj);
}
}

3、分析監聽器類別并選取自己需要的監聽器類別:

監聽器是執行消息處理的最終要方法,JMS提供了MessageListener接口,實現MessageListener接口并實現OnMessage方法,即可實現消息的監聽。

但是,這種方式并沒有返回一個處理結束的結果,這是就考慮到了spring提供的SessionAwareMessageListener接口,實現SessionAwareMessageListener接口并實現OnMessage方法,即可實現監聽,但是不同的是,OnMessag有兩個參數TextMessage?message,?Session?session:通過session可是實現回復該線程? ? ? ? ?MessageProducer?producer?=?session.createProducer(destination); ?? ? ? ? Message?textMessage?=?session.createTextMessage("ConsumerSessionAwareMessageListener。。。"); ?producer.send(textMessage); ?

上面這種方式明顯已經可以實現回復信息,但是相對于MessageListenerAdapter還是略顯粗暴,MessageListenerAdapter直接實現了以上兩個接口,可以直接通過它選擇監聽方法、操作方法,綁定回復隊列。然后DefaultMessageListenerContainer設置messageListener為當前adapter就ok了。

<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"><property name="delegate" ref="consumerMessageListener"/><property name="defaultListenerMethod" value="receiveMessage"/><property name="defaultResponseDestination" ref="responseQueue"/> </bean><!-- 消息監聽適配器對應的監聽容器 --> <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="queueDestination"/><property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為消息監聽器 --> </bean> 4、動態添加監聽創建隊列:

在真正項目使用時,并不會簡單意義上的在配置文件中固定幾個隊列和幾個監聽。所以要提供接口供開發者進行調用:

注冊監聽:

public void receiveMsg(String destinationName, IMessageListener listener){
Hashtable<String, Class<IMessageListener>> cache=JmsCache.getCache();
if(cache.containsKey(destinationName)){
System.out.println("監聽已經存在");
}else{
cache.put(destinationName, (Class<IMessageListener>) listener.getClass());
System.out.println("注冊監聽");
//監聽容器
? ? DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
? ? //獲取adapter
? ? JmsMessageListenerAdapter messageListener=new JmsMessageListenerAdapter(listener);
? ? MessageListenerAdapter messageListenerAdapter=messageListener.getMessageListenerAdapter();
? ??
? ? //設置監聽容器
? ? jmsContainer.setMessageListener(messageListenerAdapter);
? ? jmsContainer.setConnectionFactory(connectionFactory);
? ? jmsContainer.setDestinationName(destinationName);
? ? jmsContainer.setSessionTransacted(false);
? ? jmsContainer.initialize();
? ? jmsContainer.start();
}
}

adapter實現類

?//MessageListenerAdapter
? ? private MessageListenerAdapter messageListenerAdapter=new MessageListenerAdapter();
? ? //監聽器
? ? private IMessageListener delegate;
? ? //實現方法
? ? private final String methodName="executeMessage";
? ? //監聽回復地址:responseQueue
? ? private final String defaultdestination="response_queue";


? ? /**
? ? ?* 默認無參構造
? ? ?*/
? ? public JmsMessageListenerAdapter(){}


? ? /**
? ? ?* 構造方法
? ? ?* @param delegate
? ? ?* 備注:responseQueue是為默認監聽
? ? ?*/
? ? public JmsMessageListenerAdapter(IMessageListener delegate){
? ? ? ? this.delegate=delegate;
? ? }


? ? /**
? ? ?* 獲取MessageListenerAdapter
? ? ?* @return
? ? ?*/
? ? public MessageListenerAdapter getMessageListenerAdapter(){
? ? ? ? messageListenerAdapter.setDelegate(delegate);
? ? ? ? messageListenerAdapter.setDefaultListenerMethod(methodName);
? ? ? ? messageListenerAdapter.setDefaultResponseQueueName(defaultdestination);
? ? ? ? return messageListenerAdapter;
? ? }


發送消息:

public void sendMsg(String destinationName, Serializable obj,
boolean asyncSend) {
if (asyncSend) {
asyJmsTemplate.convertAndSend(destinationName, obj);
} else {
jmsTemplate.convertAndSend(destinationName, obj);
}
}

總結

以上是生活随笔為你收集整理的activeMQ使用总结的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。