日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring JMS:处理事务中的消息

發(fā)布時(shí)間:2023/12/3 javascript 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring JMS:处理事务中的消息 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.引言

這篇文章將向您展示在使用JMS異步接收消息期間,使用者執(zhí)行過程中的錯(cuò)誤如何導(dǎo)致消息丟失。 然后,我將解釋如何使用本地事務(wù)解決此問題。

您還將看到這種解決方案在某些情況下可能導(dǎo)致消息重復(fù)(例如,當(dāng)它將消息保存到數(shù)據(jù)庫中,然后偵聽器執(zhí)行失敗時(shí))。 發(fā)生這種情況的原因是因?yàn)镴MS事務(wù)獨(dú)立于其他事務(wù)資源(如DB)。 如果您的處理不是冪等的,或者您的應(yīng)用程序不支持重復(fù)消息檢測(cè),那么您將不得不使用分布式事務(wù)。

分布式事務(wù)超出了此職位的范圍。 如果您對(duì)處理分布式事務(wù)感興趣,可以閱讀這篇有趣的文章。

我已經(jīng)實(shí)現(xiàn)了一個(gè)再現(xiàn)以下情況的測(cè)試應(yīng)用程序:

  • 發(fā)送和接收消息:使用者將處理收到的消息,并將其存儲(chǔ)到數(shù)據(jù)庫中。

    生產(chǎn)者將消息發(fā)送到隊(duì)列:

    使用者從隊(duì)列中檢索消息并進(jìn)行處理:

  • 消息處理之前發(fā)生錯(cuò)誤:使用者檢索消息,但是在將消息存儲(chǔ)到DB之前執(zhí)行失敗。

  • 處理消息后發(fā)生錯(cuò)誤:使用者檢索消息,將其存儲(chǔ)到DB,然后執(zhí)行失敗。

    • 該應(yīng)用程序的源代碼可以在github上找到。

    2.測(cè)試應(yīng)用

    測(cè)試應(yīng)用程序執(zhí)行兩個(gè)測(cè)試類TestNotTransactedMessaging和TestTransactedMessaging 。 這些類都將執(zhí)行上述三種情況。

    讓我們看看在沒有事務(wù)的情況下執(zhí)行應(yīng)用程序時(shí)的配置。

    app-config.xml

    應(yīng)用程序配置。 基本上,它會(huì)在指定的軟件包內(nèi)進(jìn)行檢查以自動(dòng)檢測(cè)應(yīng)用Bean:生產(chǎn)者和使用者。 它還配置了將在其中存儲(chǔ)處理后的通知的內(nèi)存數(shù)據(jù)庫。

    <context:component-scan base-package="xpadro.spring.jms.producer, xpadro.spring.jms.receiver"/><bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/> </bean><jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schema.sql" /> </jdbc:embedded-database>

    notx-jms-config.xml

    配置JMS基礎(chǔ)結(jié)構(gòu),該基礎(chǔ)結(jié)構(gòu)是:

    • 經(jīng)紀(jì)人聯(lián)系
    • JmsTemplate
    • 將通知發(fā)送到的隊(duì)列
    • 偵聽器容器,它將發(fā)送通知給偵聽器以處理它們
    <!-- Infrastructure --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/> </bean><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="connectionFactory"/> </bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="defaultDestination" ref="incomingQueue"/> </bean><!-- Destinations --> <bean id="incomingQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="incoming.queue"/> </bean><!-- Listeners --> <jms:listener-container connection-factory="connectionFactory"><jms:listener ref="notificationProcessor" destination="incoming.queue"/> </jms:listener-container>

    生產(chǎn)者僅使用jmsTemplate發(fā)送通知。

    @Component("producer") public class Producer {private static Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate JmsTemplate jmsTemplate;public void convertAndSendMessage(String destination, Notification notification) {jmsTemplate.convertAndSend(destination, notification);logger.info("Sending notification | Id: "+notification.getId());} }

    偵聽器負(fù)責(zé)從隊(duì)列中檢索通知,并將其存儲(chǔ)到數(shù)據(jù)庫中。

    @Component("notificationProcessor") public class NotificationProcessor implements MessageListener {private static Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);@Autowiredprivate JdbcTemplate jdbcTemplate;@Overridepublic void onMessage(Message message) {try {Notification notification = (Notification) ((ObjectMessage) message).getObject();logger.info("Received notification | Id: "+notification.getId()+" | Redelivery: "+getDeliveryNumber(message));checkPreprocessException(notification);saveToBD(notification);checkPostprocessException(message, notification);} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}} ... }

    當(dāng)id = 1的通知到達(dá)時(shí), checkPreprocessException方法將拋出運(yùn)行時(shí)異常。 這樣,在將消息存儲(chǔ)到數(shù)據(jù)庫之前,我們將導(dǎo)致錯(cuò)誤。

    如果到達(dá)id = 2的通知, checkPostprocessException方法將引發(fā)異常,從而在將其存儲(chǔ)到數(shù)據(jù)庫之后立即引起錯(cuò)誤。

    getDeliveryNumber方法返回發(fā)送消息的次數(shù)。 這僅適用于事務(wù),因?yàn)樵趥陕犉魈幚硎?dǎo)致回滾之后,代理將嘗試重新發(fā)送消息。

    最后, saveToDB方法非常明顯。 它將通知存儲(chǔ)到數(shù)據(jù)庫。

    您始終可以通過單擊本文開頭的鏈接來檢查此應(yīng)用程序的源代碼。

    3.測(cè)試沒有交易的消息接收

    我將啟動(dòng)兩個(gè)測(cè)試類,一個(gè)不包含事務(wù),另一個(gè)在本地事務(wù)中。 這兩個(gè)類都擴(kuò)展了一個(gè)基類,該基類加載了常見的應(yīng)用程序上下文并包含一些實(shí)用程序方法:

    @ContextConfiguration(locations = {"/xpadro/spring/jms/config/app-config.xml"}) @DirtiesContext public class TestBaseMessaging {protected static final String QUEUE_INCOMING = "incoming.queue";protected static final String QUEUE_DLQ = "ActiveMQ.DLQ";@Autowiredprotected JdbcTemplate jdbcTemplate;@Autowiredprotected JmsTemplate jmsTemplate;@Autowiredprotected Producer producer;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from Notifications");}protected int getSavedNotifications() {return jdbcTemplate.queryForObject("select count(*) from Notifications", Integer.class);}protected int getMessagesInQueue(String queueName) {return jmsTemplate.browse(queueName, new BrowserCallback<Integer>() {@Overridepublic Integer doInJms(Session session, QueueBrowser browser) throws JMSException {Enumeration<?> messages = browser.getEnumeration();int total = 0;while (messages.hasMoreElements()) {messages.nextElement();total++;}return total;}});} }

    實(shí)用方法說明如下:

    • getSavedNotifications :返回存儲(chǔ)到數(shù)據(jù)庫的通知數(shù)。 我使用了queryForObject方法,因?yàn)樽园姹?.2.2開始建議使用該方法。 queryForInt方法已被棄用。
    • getMessagesInQueue :允許您檢查指定隊(duì)列中哪些消息仍在等待處理。 對(duì)于此測(cè)試,我們有興趣知道仍有多少通知等待處理。

    現(xiàn)在,讓我向您展示第一個(gè)測(cè)試的代碼( TestNotTransactedMessaging )。 此測(cè)試啟動(dòng)本文開頭指示的3種情況。

    @Test public void testCorrectMessage() throws InterruptedException {Notification notification = new Notification(0, "notification to deliver correctly");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); }@Test public void testFailedAfterReceiveMessage() throws InterruptedException {Notification notification = new Notification(1, "notification to fail after receiving");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(0, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); }@Test public void testFailedAfterProcessingMessage() throws InterruptedException {Notification notification = new Notification(2, "notification to fail after processing");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); }private void printResults() {logger.info("Total items in \"incoming\" queue: "+getMessagesInQueue(QUEUE_INCOMING));logger.info("Total items in DB: "+getSavedNotifications()); }

    4,執(zhí)行測(cè)試

    好的,讓我們執(zhí)行測(cè)試,看看結(jié)果是什么:

    testCorrectMessage輸出:

    Producer|Sending notification | Id: 0 NotificationProcessor|Received notification | Id: 0 | Redelivery: 1 TestNotTransactedMessaging|Total items in "incoming" queue: 0 TestNotTransactedMessaging|Total items in DB: 1

    此處沒有問題,因?yàn)橄⒁颜_接收并存儲(chǔ)到數(shù)據(jù)庫,所以隊(duì)列為空。

    testFailedAfterReceiveMessage輸出:

    Producer|Sending notification | Id: 1 NotificationProcessor|Received notification | Id: 1 | Redelivery: 1 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after receiving message TestNotTransactedMessaging|Total items in "incoming" queue: 0 TestNotTransactedMessaging|Total items in DB: 0

    由于它在事務(wù)外部執(zhí)行,因此使用確認(rèn)模式(默認(rèn)為自動(dòng))。 這意味著一旦調(diào)用onMessage方法并因此將其從隊(duì)列中刪除,就認(rèn)為該消息已成功傳遞。 因?yàn)閭陕犉髟趯⑾⒋鎯?chǔ)到數(shù)據(jù)庫之前失敗,所以我們丟失了消息!

    testFailedAfterProcessingMessage輸出:

    2013-08-22 18:39:09,906|Producer|Sending notification | Id: 2 2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1 2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after processing message 2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in "incoming" queue: 0 2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1

    在這種情況下,在執(zhí)行失敗之前,該消息已從隊(duì)列(AUTO_ACKNOWLEDGE)中刪除并存儲(chǔ)到DB。

    5,添加本地交易

    通常,我們不允許像測(cè)試的第二種情況那樣丟失消息,因此我們要做的是在本地事務(wù)中調(diào)用偵聽器。 所做的更改非常簡(jiǎn)單,并不意味著從我們的應(yīng)用程序中修改一行代碼。 我們只需要更改配置文件。

    為了測(cè)試這3種涉及事務(wù)的情況,我將以下配置文件notx-jms-config.xml替換為:

    tx-jms-config.xml

    首先,我添加了在發(fā)生回滾的情況下進(jìn)行的重新傳遞的數(shù)量(由于偵聽器執(zhí)行中的錯(cuò)誤導(dǎo)致):

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/><property name="redeliveryPolicy"><bean class="org.apache.activemq.RedeliveryPolicy"><property name="maximumRedeliveries" value="4"/></bean></property> </bean>

    接下來,我指示偵聽器將在事務(wù)內(nèi)執(zhí)行。 這可以通過修改偵聽器容器定義來完成:

    <jms:listener-container connection-factory="connectionFactory" acknowledge="transacted"><jms:listener ref="notificationProcessor" destination="incoming.queue"/> </jms:listener-container>

    這將導(dǎo)致在本地JMS事務(wù)中執(zhí)行對(duì)偵聽器的每次調(diào)用。 收到消息后,事務(wù)將開始。 如果偵聽器執(zhí)行失敗,則消息接收將回滾。

    這就是我們要做的一切。 讓我們使用此配置啟動(dòng)測(cè)試。

    6,測(cè)試交易中的消息接收

    來自TestTransactedMessaging類的代碼實(shí)際上與先前的測(cè)試相同。 唯一的區(qū)別是,它向DLQ(死信隊(duì)列)添加了查詢。 在事務(wù)內(nèi)執(zhí)行時(shí),如果回退消息接收,則代理會(huì)將消息發(fā)送到此隊(duì)列(在所有重新傳遞失敗之后)。

    我跳過了成功接收的輸出,因?yàn)樗粫?huì)帶來任何新的變化。

    testFailedAfterReceiveMessage輸出:

    Producer|Sending notification | Id: 1 NotificationProcessor|Received notification | Id: 1 | Redelivery: 1 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after receiving message NotificationProcessor|Received notification | Id: 1 | Redelivery: 2 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. ... java.lang.RuntimeException: error after receiving message NotificationProcessor|Received notification | Id: 1 | Redelivery: 5 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after receiving message TestTransactedMessaging|Total items in "incoming" queue: 0 TestTransactedMessaging|Total items in "dead letter" queue: 1 TestTransactedMessaging|Total items in DB: 0

    如您所見,第一次接收失敗,并且代理嘗試將其重新發(fā)送四次(如maximumRedeliveries屬性中所示)。 由于情況持續(xù)存在,因此消息已發(fā)送到特殊DLQ隊(duì)列。 這樣,我們不會(huì)丟失消息。

    testFailedAfterProcessingMessage輸出:

    Producer|Sending notification | Id: 2 NotificationProcessor|Received notification | Id: 2 | Redelivery: 1 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after processing message NotificationProcessor|Received notification | Id: 2 | Redelivery: 2 TestTransactedMessaging|Total items in "incoming" queue: 0 TestTransactedMessaging|Total items in "dead letter" queue: 0 TestTransactedMessaging|Total items in DB: 2

    在這種情況下,這是發(fā)生的情況:

  • 偵聽器檢索到消息
  • 它將消息存儲(chǔ)到數(shù)據(jù)庫
  • 偵聽器執(zhí)行失敗
  • 代理重新發(fā)送消息。 由于情況已解決,因此偵聽器將消息再次存儲(chǔ)到DB。 該消息已重復(fù)。
  • 7,結(jié)論

    將本地事務(wù)添加到消息接收中可避免丟失消息。 我們必須考慮的是,可能會(huì)出現(xiàn)重復(fù)的消息,因此我們的偵聽器將不得不檢測(cè)到它,否則我們的處理必須是冪等的才能再次進(jìn)行處理。 如果這不可能,我們將不得不進(jìn)行分布式事務(wù),因?yàn)樗鼈冎С稚婕安煌Y源的事務(wù)。

    參考: Spring JMS:在XavierPadró的Blog博客上處理來自JCG合作伙伴 Xavier Padro的事務(wù)內(nèi)的消息 。

    翻譯自: https://www.javacodegeeks.com/2014/02/spring-jms-processing-messages-within-transactions.html

    總結(jié)

    以上是生活随笔為你收集整理的Spring JMS:处理事务中的消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。