javascript
Spring JMS:处理事务中的消息
1.引言
這篇文章將向您展示在使用JMS異步接收消息期間,使用者執(zhí)行過程中的錯(cuò)誤如何導(dǎo)致消息丟失。 然后,我將解釋如何使用本地事務(wù)解決此問題。
您還將看到這種解決方案在某些情況下可能導(dǎo)致消息重復(fù)(例如,當(dāng)它將消息保存到數(shù)據(jù)庫中,然后偵聽器執(zhí)行失敗時(shí))。 發(fā)生這種情況的原因是因?yàn)镴MS事務(wù)獨(dú)立于其他事務(wù)資源(如DB)。 如果您的處理不是冪等的,或者您的應(yīng)用程序不支持重復(fù)消息檢測(cè),那么您將不得不使用分布式事務(wù)。
分布式事務(wù)超出了此職位的范圍。 如果您對(duì)處理分布式事務(wù)感興趣,可以閱讀這篇有趣的文章。
我已經(jīng)實(shí)現(xiàn)了一個(gè)再現(xiàn)以下情況的測(cè)試應(yīng)用程序:
生產(chǎn)者將消息發(fā)送到隊(duì)列:
使用者從隊(duì)列中檢索消息并進(jìn)行處理:
- 該應(yīng)用程序的源代碼可以在github上找到。
2.測(cè)試應(yīng)用
測(cè)試應(yīng)用程序執(zhí)行兩個(gè)測(cè)試類TestNotTransactedMessaging和TestTransactedMessaging 。 這些類都將執(zhí)行上述三種情況。
讓我們看看在沒有事務(wù)的情況下執(zhí)行應(yīng)用程序時(shí)的配置。
app-config.xml
應(yīng)用程序配置。 基本上,它會(huì)在指定的軟件包內(nèi)進(jìn)行檢查以自動(dòng)檢測(cè)應(yīng)用Bean:生產(chǎn)者和使用者。 它還配置了將在其中存儲(chǔ)處理后的通知的內(nèi)存數(shù)據(jù)庫。
<context:component-scan base-package="xpadro.spring.jms.producer, xpadro.spring.jms.receiver"/><bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/> </bean><jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schema.sql" /> </jdbc:embedded-database>notx-jms-config.xml
配置JMS基礎(chǔ)結(jié)構(gòu),該基礎(chǔ)結(jié)構(gòu)是:
- 經(jīng)紀(jì)人聯(lián)系
- JmsTemplate
- 將通知發(fā)送到的隊(duì)列
- 偵聽器容器,它將發(fā)送通知給偵聽器以處理它們
生產(chǎn)者僅使用jmsTemplate發(fā)送通知。
@Component("producer") public class Producer {private static Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate JmsTemplate jmsTemplate;public void convertAndSendMessage(String destination, Notification notification) {jmsTemplate.convertAndSend(destination, notification);logger.info("Sending notification | Id: "+notification.getId());} }偵聽器負(fù)責(zé)從隊(duì)列中檢索通知,并將其存儲(chǔ)到數(shù)據(jù)庫中。
@Component("notificationProcessor") public class NotificationProcessor implements MessageListener {private static Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);@Autowiredprivate JdbcTemplate jdbcTemplate;@Overridepublic void onMessage(Message message) {try {Notification notification = (Notification) ((ObjectMessage) message).getObject();logger.info("Received notification | Id: "+notification.getId()+" | Redelivery: "+getDeliveryNumber(message));checkPreprocessException(notification);saveToBD(notification);checkPostprocessException(message, notification);} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}} ... }當(dāng)id = 1的通知到達(dá)時(shí), checkPreprocessException方法將拋出運(yùn)行時(shí)異常。 這樣,在將消息存儲(chǔ)到數(shù)據(jù)庫之前,我們將導(dǎo)致錯(cuò)誤。
如果到達(dá)id = 2的通知, checkPostprocessException方法將引發(fā)異常,從而在將其存儲(chǔ)到數(shù)據(jù)庫之后立即引起錯(cuò)誤。
getDeliveryNumber方法返回發(fā)送消息的次數(shù)。 這僅適用于事務(wù),因?yàn)樵趥陕犉魈幚硎?dǎo)致回滾之后,代理將嘗試重新發(fā)送消息。
最后, saveToDB方法非常明顯。 它將通知存儲(chǔ)到數(shù)據(jù)庫。
您始終可以通過單擊本文開頭的鏈接來檢查此應(yīng)用程序的源代碼。
3.測(cè)試沒有交易的消息接收
我將啟動(dòng)兩個(gè)測(cè)試類,一個(gè)不包含事務(wù),另一個(gè)在本地事務(wù)中。 這兩個(gè)類都擴(kuò)展了一個(gè)基類,該基類加載了常見的應(yīng)用程序上下文并包含一些實(shí)用程序方法:
@ContextConfiguration(locations = {"/xpadro/spring/jms/config/app-config.xml"}) @DirtiesContext public class TestBaseMessaging {protected static final String QUEUE_INCOMING = "incoming.queue";protected static final String QUEUE_DLQ = "ActiveMQ.DLQ";@Autowiredprotected JdbcTemplate jdbcTemplate;@Autowiredprotected JmsTemplate jmsTemplate;@Autowiredprotected Producer producer;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from Notifications");}protected int getSavedNotifications() {return jdbcTemplate.queryForObject("select count(*) from Notifications", Integer.class);}protected int getMessagesInQueue(String queueName) {return jmsTemplate.browse(queueName, new BrowserCallback<Integer>() {@Overridepublic Integer doInJms(Session session, QueueBrowser browser) throws JMSException {Enumeration<?> messages = browser.getEnumeration();int total = 0;while (messages.hasMoreElements()) {messages.nextElement();total++;}return total;}});} }實(shí)用方法說明如下:
- getSavedNotifications :返回存儲(chǔ)到數(shù)據(jù)庫的通知數(shù)。 我使用了queryForObject方法,因?yàn)樽园姹?.2.2開始建議使用該方法。 queryForInt方法已被棄用。
- getMessagesInQueue :允許您檢查指定隊(duì)列中哪些消息仍在等待處理。 對(duì)于此測(cè)試,我們有興趣知道仍有多少通知等待處理。
現(xiàn)在,讓我向您展示第一個(gè)測(cè)試的代碼( TestNotTransactedMessaging )。 此測(cè)試啟動(dòng)本文開頭指示的3種情況。
@Test public void testCorrectMessage() throws InterruptedException {Notification notification = new Notification(0, "notification to deliver correctly");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); }@Test public void testFailedAfterReceiveMessage() throws InterruptedException {Notification notification = new Notification(1, "notification to fail after receiving");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(0, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); }@Test public void testFailedAfterProcessingMessage() throws InterruptedException {Notification notification = new Notification(2, "notification to fail after processing");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); }private void printResults() {logger.info("Total items in \"incoming\" queue: "+getMessagesInQueue(QUEUE_INCOMING));logger.info("Total items in DB: "+getSavedNotifications()); }4,執(zhí)行測(cè)試
好的,讓我們執(zhí)行測(cè)試,看看結(jié)果是什么:
testCorrectMessage輸出:
Producer|Sending notification | Id: 0 NotificationProcessor|Received notification | Id: 0 | Redelivery: 1 TestNotTransactedMessaging|Total items in "incoming" queue: 0 TestNotTransactedMessaging|Total items in DB: 1此處沒有問題,因?yàn)橄⒁颜_接收并存儲(chǔ)到數(shù)據(jù)庫,所以隊(duì)列為空。
testFailedAfterReceiveMessage輸出:
Producer|Sending notification | Id: 1 NotificationProcessor|Received notification | Id: 1 | Redelivery: 1 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after receiving message TestNotTransactedMessaging|Total items in "incoming" queue: 0 TestNotTransactedMessaging|Total items in DB: 0由于它在事務(wù)外部執(zhí)行,因此使用確認(rèn)模式(默認(rèn)為自動(dòng))。 這意味著一旦調(diào)用onMessage方法并因此將其從隊(duì)列中刪除,就認(rèn)為該消息已成功傳遞。 因?yàn)閭陕犉髟趯⑾⒋鎯?chǔ)到數(shù)據(jù)庫之前失敗,所以我們丟失了消息!
testFailedAfterProcessingMessage輸出:
2013-08-22 18:39:09,906|Producer|Sending notification | Id: 2 2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1 2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after processing message 2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in "incoming" queue: 0 2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1在這種情況下,在執(zhí)行失敗之前,該消息已從隊(duì)列(AUTO_ACKNOWLEDGE)中刪除并存儲(chǔ)到DB。
5,添加本地交易
通常,我們不允許像測(cè)試的第二種情況那樣丟失消息,因此我們要做的是在本地事務(wù)中調(diào)用偵聽器。 所做的更改非常簡(jiǎn)單,并不意味著從我們的應(yīng)用程序中修改一行代碼。 我們只需要更改配置文件。
為了測(cè)試這3種涉及事務(wù)的情況,我將以下配置文件notx-jms-config.xml替換為:
tx-jms-config.xml
首先,我添加了在發(fā)生回滾的情況下進(jìn)行的重新傳遞的數(shù)量(由于偵聽器執(zhí)行中的錯(cuò)誤導(dǎo)致):
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/><property name="redeliveryPolicy"><bean class="org.apache.activemq.RedeliveryPolicy"><property name="maximumRedeliveries" value="4"/></bean></property> </bean>接下來,我指示偵聽器將在事務(wù)內(nèi)執(zhí)行。 這可以通過修改偵聽器容器定義來完成:
<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted"><jms:listener ref="notificationProcessor" destination="incoming.queue"/> </jms:listener-container>這將導(dǎo)致在本地JMS事務(wù)中執(zhí)行對(duì)偵聽器的每次調(diào)用。 收到消息后,事務(wù)將開始。 如果偵聽器執(zhí)行失敗,則消息接收將回滾。
這就是我們要做的一切。 讓我們使用此配置啟動(dòng)測(cè)試。
6,測(cè)試交易中的消息接收
來自TestTransactedMessaging類的代碼實(shí)際上與先前的測(cè)試相同。 唯一的區(qū)別是,它向DLQ(死信隊(duì)列)添加了查詢。 在事務(wù)內(nèi)執(zhí)行時(shí),如果回退消息接收,則代理會(huì)將消息發(fā)送到此隊(duì)列(在所有重新傳遞失敗之后)。
我跳過了成功接收的輸出,因?yàn)樗粫?huì)帶來任何新的變化。
testFailedAfterReceiveMessage輸出:
Producer|Sending notification | Id: 1 NotificationProcessor|Received notification | Id: 1 | Redelivery: 1 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after receiving message NotificationProcessor|Received notification | Id: 1 | Redelivery: 2 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. ... java.lang.RuntimeException: error after receiving message NotificationProcessor|Received notification | Id: 1 | Redelivery: 5 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after receiving message TestTransactedMessaging|Total items in "incoming" queue: 0 TestTransactedMessaging|Total items in "dead letter" queue: 1 TestTransactedMessaging|Total items in DB: 0如您所見,第一次接收失敗,并且代理嘗試將其重新發(fā)送四次(如maximumRedeliveries屬性中所示)。 由于情況持續(xù)存在,因此消息已發(fā)送到特殊DLQ隊(duì)列。 這樣,我們不會(huì)丟失消息。
testFailedAfterProcessingMessage輸出:
Producer|Sending notification | Id: 2 NotificationProcessor|Received notification | Id: 2 | Redelivery: 1 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after processing message NotificationProcessor|Received notification | Id: 2 | Redelivery: 2 TestTransactedMessaging|Total items in "incoming" queue: 0 TestTransactedMessaging|Total items in "dead letter" queue: 0 TestTransactedMessaging|Total items in DB: 2在這種情況下,這是發(fā)生的情況:
7,結(jié)論
將本地事務(wù)添加到消息接收中可避免丟失消息。 我們必須考慮的是,可能會(huì)出現(xiàn)重復(fù)的消息,因此我們的偵聽器將不得不檢測(cè)到它,否則我們的處理必須是冪等的才能再次進(jìn)行處理。 如果這不可能,我們將不得不進(jìn)行分布式事務(wù),因?yàn)樗鼈冎С稚婕安煌Y源的事務(wù)。
翻譯自: https://www.javacodegeeks.com/2014/02/spring-jms-processing-messages-within-transactions.html
總結(jié)
以上是生活随笔為你收集整理的Spring JMS:处理事务中的消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎样给无线路由器安装软件如何给路由器安装
- 下一篇: 错误处理在Spring Integrat