日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入了解DefaultMessageListenerContainer

發布時間:2024/4/17 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入了解DefaultMessageListenerContainer 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

http://www.myexception.cn/ai/1252456.html

?

深入理解DefaultMessageListenerContainer

DefaultMessageListenerContainer是一個用于異步消息監聽的管理類。

DefaultMessageListenerContainer最簡單的實現邏輯,一個任務執行器,執行任務(即消息監聽)。

DefaultMessageListenerContainer實現的主要原理是,通過內部初始化建立的一個taskExecutor(默認是 SimpleAsyncTaskExecutor)用于執行消息監聽的任務(AsyncMessageListenerInvoker)。

這里默認的任務執行器是SimpleAsyncTaskExecutor,這個執行器的缺點是不會重用連接,也就是對于每個任務都需要新開啟一個線程,執行完任務后會關閉它。如果要優化的話可以考慮使用線程池。

消息監聽的任務被抽象成AsyncMessageListenerInvoker類,這個類實現了Runnable接口,內部run方法其實是通過不斷循環MessageConsumer.receive()方法來實現監聽。

?

事實上一個消費者對應了一個AsyncMessageListenerInvoker任務,每個任務需要一個單獨的線程去執行它。這個AsyncMessageListenerInvoker實例被放在了一個名為scheduledInvokers的set里面。

其實我們還有一個比較關心的地方是這個DefaultMessageListenerContainer緩不緩存 connection,session,consumer。因為它不像消息發送一樣可以使用PooledConnectionFactory或者CachingConnectionFactory。它是根據catchLevel屬性來決定是否緩存 connection,session,consumer。默認的catchLevel對應常量CATCH_AUTO=4,即由配置的外部事務管理器決定。 catchLevel級別分別是CATCH_NONE,CATCH_CONNECTION,CATCH_SESSION,CATCH_CONSUMER,CATCH_AUTO, 分別對應0,1,2,3,4。我試了下默認的CATCH_AUTO在沒有定義事務管理時值為 CATCH_CONSUMER,即3。

具體查看類中的方法:

publicvoid initialize(){
??? ??? // Adapt default cache level.
??? ??? if(this.cacheLevel == CACHE_AUTO){
??? ??? ??? this.cacheLevel =(getTransactionManager()!=null? CACHE_NONE : CACHE_CONSUMER);
??? ??? }


??? ??? // Prepare taskExecutor and maxMessagesPerTask.
??? ??? synchronized(this.lifecycleMonitor){
??? ??? ??? if(this.taskExecutor ==null){
??? ??? ??? ??? this.taskExecutor = createDefaultTaskExecutor();
??? ??? ??? }
??? ??? ??? elseif(this.taskExecutor instanceofSchedulingTaskExecutor&&
??? ??? ??? ??? ??? ((SchedulingTaskExecutor)this.taskExecutor).prefersShortLivedTasks()&&
??? ??? ??? ??? ??? this.maxMessagesPerTask ==Integer.MIN_VALUE){
??? ??? ??? ??? // TaskExecutor indicated a preference for short-lived tasks. According to
??? ??? ??? ??? // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
??? ??? ??? ??? // unless the user specified a custom value.
??? ??? ??? ??? this.maxMessagesPerTask =10;
??? ??? ??? }
??? ??? }

??? ??? // Proceed with actual listener initialization.
??? ??? super.initialize();
??? }

?

?

CATCH_NONE=0,表示不緩存JMS任何資源。

CATCH_CONNECTION=1,表示只緩存JMS的共享Connection。

CATCH_SESSION=2,表示緩存JMS的共享Connection和Session。

CATCH_CONSUMER=3,表示緩存JMS的共享Connection和Session還有MessageConsumer。

CATCH_AUTO=4,表示系統自動選擇一個合適的cacheLevel(根據事務管理策略)。

?

DefaultMessageListenerContainer會根據catchLevel來緩存共享connection,session, 及consumer。值為3的話就會緩存connection,session,及consumer,在初始化的時候就會調用父類 AbstractJmsListeningContainer的doStart()方法,判斷cacheLevel是否大于等于1,如果大于就創建一個 connection將放入成員變量sharedConnection中。

每個任務被執行的時候(即責任是監聽消息),會先去獲取connection,session及consumer(通過調用 initResourcesIfNecessary方法)就像我們自己最初實現一個簡單的客戶端消費者一樣。只不過這里會根據catchLevel來決定 是否緩存session及consumer。被緩存了的session及consumer放在對應的成員變量里面。

?接著任務會想要執行consumer.recieve方法,這之前肯定要獲取connection,session及consumer,如果已有 connection,session及consumer則獲取過來,如果沒有則通過配置的信息新建。執行完consumer.recieve后,會判斷 consumer.recieve返回的消息是否為空。

不為空則調用message對應的messageListner(之前我們在DefaultMessageListenerContainer中通過方法 setMessageListner設置的)的onMessage執行相應的邏輯,并設置這個任務的Idle為false,表明這個任務不是空閑的,然后 會調用方法判斷是否應該新建任務實例,這個受限于MaxConcurrentConsumers及IdleTaskExecutionLimit。為空則不需要特別處理,只需調用noMessageReceived方法將idle標記設為true。

任務執行完后,會在finally處釋放connection,session及consumer。這個是根據上述講的catchLevel來設置的。

?

? 繼承體系如下:

???

AbstractJmsListeningContainer提供了一個最上層最基礎的jms消息監聽管理類所應該有的方法。提供了start(啟動這個 管理類),stop,initialize(初始化這個管理類),establishSharedConnection等。

http://tech.techweb.com.cn/thread-535513-1-1.html 使用DefaultMessageListenerContainer作為消息接收器,典型的配置如下:
) z/ a) z* f& L9 e5 u: t2 _" {1 ^' s, T- z
<bean id="queueListenerContainer"
?class="org.springframework.jms.listener.DefaultMessageListenerContainer">?
??<property name="connectionFactory" ref="mqConnectionFactory" /> ' K7 [( R8 |2 I& I( u; j* l) j
? ?? ???<property name="destination" ref="queueIn" /><!-- 接收隊列 -->
?<property name="concurrentConsumers" value="3" /><!-- 控制同時啟幾個concurrent listener threads --> ' _2 @! f) [3 ]??I% l4 [
? ?? ???<property name="messageListener" ref="messageReceiver" /> 4 W1 b. O- i% O4 y4 U, c% x
? ?? ???<property name="transactionManager" ref="jmsTransactionManager" /> 5 K8 R3 x0 |; c4 }; I% z
? ?? ???<property name="sessionTransacted" value="true" /> 1 o??x- I4 A+ r1 L/ s
</bean>
?<bean id="jmsTransactionManager" ( N/ r+ H: d( i$ A" Y' t
? ?? ???class="org.springframework.jms.connection.JmsTransactionManager">
?<property name="connectionFactory" ref="mqConnectionFactory" /> & H7 r1 A0 j) C- A

?</bean>

總結

以上是生活随笔為你收集整理的深入了解DefaultMessageListenerContainer的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。