日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

学习笔记:ActiveMQ + SpringBoot 事务问题 序列化问题

發布時間:2023/12/31 javascript 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 学习笔记:ActiveMQ + SpringBoot 事务问题 序列化问题 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、docker 安裝 ActiveMQ

1.在docker環境中執行

// 搜索activemq鏡像 docker search activemq // 拉取activemq鏡像 docker pull webcenter/activemq // 查看拉取后的activemq鏡像 docker images // 創建數據文件夾和日志文件夾 mkdir -p ./activemq/soft/activemq mkdir -p ./activemq/soft/activemq/log // docker執行命令,名稱,后臺啟動,綁定端口,開機啟動,數據卷綁定 docker run --name=activemq -itd -p 8161:8161 -p 61616:61616 --restart=always -v /home/docker/activemq/soft/activemq:/data/activemq -v /home/docker/activemq/soft/activemq/log:/var/log/activemq webcenter/activemq:latest // 默認登陸用戶名密碼 用戶名密碼admin/admin

2.訪問頁面
訪問路徑:http://ip:8161。

二、ActiveMQ介紹

1.ActiveMQ基于JMS協議,組成部分:

JMS Provider:生產者,支持事務來保證發送可靠性。
JMS Message:JMS 的消息,主要類型有Text,Object,Map,Bytes,Stream五種類型。由消息頭,消息屬性,消息體組成。
JMS Consumer:消費者,支持事務和確認機制來保證消費可靠性。同步:使用recive()方法阻塞接受消息(客戶端拉)。異步:使用監聽方式接受消息(服務器推)。
JMS Domains:消息傳遞域,支持P2P(點對點傳輸),只存在一個隊列,生產者發送到隊列,消費者從隊列中消費;支持pub/sub訂閱消費模式,生產者發送到Topic,生產者訂閱topic進行消費(消費者在訂閱之前是收不到消息的。在訂閱之后在線的狀態可以收到消息,如果想離線后依然能接收到消息,需要設置成持久訂閱)。

Connection Factory:連接工廠,創建連接,通過連接可以創建session對話,再創建生產者和消費者。(springboot中整合為jmstemplate模板,可以直接生產和消費消息);
JMS Connection:封裝了客戶與 JMS 提供者之間的一個虛擬的連接。
JMS Session:是生產者和消費者的一個單線程上下文。會話用于創建消息生產者(Producer)、消息消費者(Consumer),和消息(Message)等。會話提供了一個事務性的上下文,一組發送和接收被組合到了一個原子操作中。

2.消息結構

1.消息頭

屬性含義
Destination目的地,主要是queue和topic
DeliveryMode傳遞模式,在send或者jmstemplate中設置。分為持久模式和非持久模式
Expiration消息過期/到期時間,在send或者jmstemplate中設置
Priority消息優先級,有 0-9 十個級別,0-4是普通消息,5-9是加急消息。JMS 不要求 JMS Provider 嚴格按著十個優先級發送消息,但必須保證加急消息要先于普通消息到達。默認是第4級
MessageID由生產者自動分配,唯一的ID,以ID:開頭
Timestamp生產者發送消息到消息call或者return返回的時間差
JMSTypeJMS 消息類型的識別符
CorrelationIDJMS 相關性 id,由客戶端設置。用來連接到另外一個消息,典型的應用是在回復消息中連接到原消息
ReplyTo回復,由客戶端設置。提供本信息回復消息的目的地址
Redelivered重發,由JMS Provider(供應商)設置

2.消息體:JMS API 定義了 5 種消息體格式,也叫消息類型,可以使用不同的形式發送接收數據,并可以兼容現有的消息格式。包括:TextMessage、MapMessage、BytesMessage、StreamMessage 和 ObjectMessage。它們都是 Message 接口的子類。

3.消息屬性:自定義屬性,JMS定義的屬性,供應商特點的屬性。

三、ActiveMQ + SpringBoot的使用

ActiveMQ配置

spring:activemq:broker-url: tcp://192.168.99.100:61616user: adminpassword: adminin-memory: false # 基于外部mq模式pool:enable: true #開啟鏈接池max-connections: 10 #最大鏈接數 package com.zwfw.framework.activemq.config;import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.RedeliveryPolicy; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType;import javax.jms.DeliveryMode; import javax.jms.Session;@Configuration public class ActivemqConfig {@Value("${spring.activemq.broker-url}")private String brokerUrl;@Value("${spring.activemq.user}")private String username;@Value("${spring.activemq.password}")private String password;/*** 消息重發策略配置*/@Beanpublic RedeliveryPolicy redeliveryPolicy() {RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();//是否在每次嘗試重新發送失敗后,增長這個等待時間redeliveryPolicy.setUseExponentialBackOff(true);//重發次數,默認為6次-設置為3次redeliveryPolicy.setMaximumRedeliveries(3);//重發時間間隔單位毫秒,默認為1秒redeliveryPolicy.setInitialRedeliveryDelay(1000L);//第一次失敗后重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒redeliveryPolicy.setBackOffMultiplier(2);// 是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);// 設置重發最大拖延時間-1表示無延遲限制redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}/*** 消息工廠配置*/@Beanpublic ActiveMQConnectionFactory activeMqConnectionFactory() {ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);activeMqConnectionFactory.setRedeliveryPolicy(redeliveryPolicy());return activeMqConnectionFactory;}@Bean(name = "jmsTemplate")public JmsTemplate jmsTemplate() {JmsTemplate jmsTemplate = new JmsTemplate();// 設置連接工廠jmsTemplate.setConnectionFactory(activeMqConnectionFactory());//deliveryMode, priority, timeToLive 的開關,要生效,必須配置為true,默認falsejmsTemplate.setExplicitQosEnabled(true);//定義持久化后節點掛掉以后,重啟可以繼續消費 1表示非持久化,2表示持久化jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);/*** 如果不啟用事務,則會導致XA事務失效;* 作為生產者如果需要支持事務,則需要配置SessionTransacted為true*/jmsTemplate.setSessionTransacted(false);//消息的應答方式,需要手動確認,此時SessionTransacted必須被設置為false,且為Session.CLIENT_ACKNOWLEDGE模式/*** 當關閉事務時候,下面設置才有效* Session.AUTO_ACKNOWLEDGE 消息自動簽收* Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收* Session.DUPS_OK_ACKNOWLEDGE 不必必須簽收,消息可能會重復發送*/jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());return jmsTemplate;}/*** topic模式的ListenerContainer* topic下沒有消息回執一說,確認消息之存在queue模式* 瀏覽只是針對 Queue 的概念,Topic 沒有瀏覽。瀏覽是指獲取消息而消息依然保持在 broker 中,而消息的接收會把消息從 broker 中移除。*/@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerTopic() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setPubSubDomain(true);factory.setConnectionFactory(activeMqConnectionFactory());factory.setMessageConverter(jacksonJmsMessageConverter());return factory;}/*** queue模式的ListenerContainer* 監聽容器配置,使用jackson的消息轉換器* 1 不開啟事務,手動確認,自動確認* 2 開啟事務,是自動應答,當客戶端消費有異常拋出,會進行重試模式,按照上面重試配置次數重試后,如果還是失敗,則會進入死信隊列* @return*/@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerQueue() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();// 關閉事務factory.setSessionTransacted(false);// 設置手動確認,默認配置中Session是開啟了事務的,事務優先級大于客戶端確認,即使我們設置了手動Ack也是無效的factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);factory.setConnectionFactory(activeMqConnectionFactory());factory.setMessageConverter(jacksonJmsMessageConverter());return factory;}/*** queue模式的ListenerContainer* 監聽容器配置,使用自帶的消息轉換器*/@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerQueueNoConver() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();// 關閉事務factory.setSessionTransacted(false);// 設置手動確認,默認配置中Session是開啟了事物的,即使我們設置了手動Ack也是無效的factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);factory.setConnectionFactory(activeMqConnectionFactory());return factory;}/*** 自定義消息轉換器* @return*/@Beanpublic MessageConverter jacksonJmsMessageConverter() {MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();// MappingJackson2MessageConverter只支持TEXT和byte類型的轉換,// 見org.springframework.jms.support.converter.MappingJackson2MessageConverter.toMessageconverter.setTargetType(MessageType.TEXT);// 可以為任何字符,但必需要配置,在下文中的setTypeIdOnMessage方法中會用上converter.setTypeIdPropertyName("_type");return converter;} }

queue和topic配置

package com.zwfw.framework.activemq.queue;import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import javax.jms.Queue; import javax.jms.Topic;@Configuration public class QueueConfig {/*** 聲明普通隊列*/@Beanpublic Queue conmonQueue(){return new ActiveMQQueue("common.queue");}/*** 聲明延時隊列*/@Beanpublic Queue delayQueue(){return new ActiveMQQueue("delay.queue");}/*** 聲明廣播類型隊列*/@Beanpublic Topic topicQueue(){return new ActiveMQTopic("topic.queue");} }

上面是mq的基本配置,配置了幾個監聽容器:queue模式下的帶事務和不帶事務手動確認的容器和topic監聽的容器,這些容器在后續監聽類中配置用得上。

P2P模式(圖是借鑒來的):

前言:該模式,點對點傳輸。生產者傳輸消息到隊列,消費者從隊列中消費。生產者可配置事務,如果開啟事務發送,只有在commit之后,隊列中才會有消息入列,如果rollback則不會進入隊列;客戶端一般有兩種方式,一種是事務,第二種是確認機制:開啟事務,不需要設置確認機制(事務優先級大于確認機制,設置了手動確認等也無效),默認就是自動確認,當事務方法中拋出異常,則消息不會被消費,會進入重試,重試次數到了之后,會進入activemq的DLQ隊列(死信)。


生產者代碼和說明:

@RestController public class SendController {@Autowiredprivate Queue conmonQueue;@Autowiredprivate JmsTemplate jmsTemplate;/*** 單條數據發送,事務模式*/@RequestMapping("/commonQueue")public void commonQueue() throws InterruptedException {for (int i = 0; i < 20; i++) {Book book = new Book();book.setName("三體" + i).setAuthor("劉慈欣" + i).setType("科幻" + i);// 設置事務模式發送jmsTemplate.setSessionTransacted(true);jmsTemplate.convertAndSend(conmonQueue,book);if (i == 12) {throw new RuntimeException();}}}/*** 批量發送,事務模式*/@RequestMapping("/commonQueue/{num}")public void commonQueueTranscate(@PathVariable("num") Integer num) throws Exception {MessageProducer pd = null;Session session = null;Connection connection = null;try {ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();connection = connectionFactory.createConnection();connection.start();// 開啟事務,只能設置AUTO_ACKNOWLEDGE,其他模式無效且不受控制session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);pd = session.createProducer(conmonQueue);for (int i = 0; i < 20; i++) {Book book = new Book();book.setName("三體" + i).setAuthor("劉慈欣" + i).setType("科幻" + i);// 用內置的消息轉換器TextMessage message = session.createTextMessage(JSON.toJSONString(book));// 用jackson的消息轉化器 // Message message = jmsTemplate.getMessageConverter().toMessage(book, session);pd.send(message);System.out.println("send book" + i + " to queue");}// 測試判斷,偶數提交,奇數回滾if (num % 2 == 0) {session.commit();}else {session.rollback();}} catch (JMSException e) {throw new RuntimeException(e);}finally {pd.close();session.close();connection.close();}}}

事務:使用springboot整合后,基本上是使用jmstemplate模版進行消息的發送。在事務環境下,好像只能發送單條并且發送成功后確認單條數據。這樣數據量大的話感覺會影響效率。如果在批量發送環境下,用jmstemplate發送我還沒有找到合適的方法 ………所以用session會話的模式,設置事務進行批量發送。
序列化:在配置文件中,定義了jackson的序列化方式,如果不定義,就是使用默認的org.springframework.jms.support.converter.SimpleMessageConverter.toMessage序列化。在里面根據你發送消息的類型來序列化。

如果使用的是自定義的jackson序列化,發送的jmstemplate模板也需要注入jackson的序列化配置,在監聽容器配置中,也需要注入jackson的序列化配置。否則,如果發送的jmstemplate沒有注入,或者用的會話模式session.createTextMessage來發送的消息(自帶的序列化),在監聽收到消息后會序列化失敗,原因是,使用了jackson配置發送的消息,在內部會調用setTypeIdOnMessage,里面會塞入設置過的typeIdPropertyName。當消費者反序列化的時候,則會調用getJavaTypeForMessage方法,里面會判斷有沒有這個屬性,如果沒有則拋出異常。
總而言之,發送端和接收端的序列化配置必需同步。


當客戶端監聽配置的反序列化是jackson后,jmstemplate也要注入jackson配置。如果想要批量發送消息,可以使用下面的模版來構造一個消息對象,通過配置jackson后的jmstemplate方法是調用了setTypeIdOnMessage方法,在反序列化的時候不會出現上面異常問題:

Message message = jmsTemplate.getMessageConverter().toMessage(book, session);

消費者代碼和說明:

@Component public class ActiveListener {/*** 將開啟事務時候,方法內有異常則不會確認消費* 發生異常會進入重試模式,服務器按重試配置數推送,默認6次* 重試還是失敗,消息會進入死信隊列*/@JmsListener(destination = "common.queue", containerFactory = "jmsListenerContainerQueue")public void commonQueueListen(Book book, ActiveMQMessage message) throws Exception {System.out.println(book);// 手動確認消息,當開啟事務時,此設置無效message.acknowledge();}/*** 使用內置序列化的配置的監聽容器*/@JmsListener(destination = "common.queue", containerFactory = "jmsListenerContainerQueueNoConver")public void commonQueueListen1(String book, ActiveMQMessage message) throws Exception {System.out.println(book);// 手動確認消息,當開啟事務時,此設置無效message.acknowledge();} }

當監聽容器開啟事務后, message.acknowledge()方法并沒有作用,事務對應的是自動確認,不受控制。當監聽容器關閉事務,應使用確認機制,一般手動確認,如果沒有確認,則消息不會被消費。

分組和并發消費

并發消費:如果想在發送消息并且由多個消費者一起并發消費,可以通過設置配置文件中的concurrency屬性,或者在@JmsListener中給注解屬性concurrency設置數量,如下圖。
分組消費:用于隊列模式。分組消費需要在生產者發送的消息中設置消息頭,使用setStringProperty來設置消息的頭屬性。在消費者端,在@JmsListener注解上添加消息頭過濾 selector =“JMSXGroupID=‘groupB’”,就可以完成分組消費。

生產者:

@RequestMapping("/groupQueue")public void groupQueue() throws InterruptedException {for (int i = 0; i < 20; i++) {Book book = new Book();book.setName("三體" + i).setAuthor("劉慈欣" + i).setType("科幻" + i);jmsTemplate.setSessionTransacted(true);// 可以用session會話模式生成的message來設置消息頭,我這里用模版發送,在生成message同時塞入屬性jmsTemplate.send(conmonQueue, session -> {Message message = jmsTemplate.getMessageConverter().toMessage(book, session);message.setStringProperty("JMSXGroupID","groupA");return message;});}}

消費者:

/*** 分組* selector,過濾頭屬性* concurrency,并發數量,可以生成多個消費者*/@JmsListener(destination = "common.queue", containerFactory = "jmsListenerContainerQueue", selector ="JMSXGroupID='groupA'")public void commonQueueListenGroupA(Book book, ActiveMQMessage message) throws Exception {System.out.println("groupA: " + book);// 手動確認消息,當開啟事務時,此設置無效message.acknowledge();}/*** 分組* selector,過濾頭屬性* concurrency,并發數量,可以生成多個消費者*/@JmsListener(concurrency = "10", destination = "common.queue", containerFactory = "jmsListenerContainerQueue", selector ="JMSXGroupID='groupB'")public void commonQueueListenGroupB(Book book, ActiveMQMessage message) throws Exception {System.out.println("groupB: " + book);// 手動確認消息,當開啟事務時,此設置無效message.acknowledge();}

pub/sub模式(圖是借鑒來的):

topic模式待完善。。。。

四、解決的問題

此文主要是提供ActiveMQ基本的配置,生產者事務,消費者事務和確認機制,生產者和消費者的序列化問題。更詳細的介紹網上有很多資料可以查閱。主要是自己學習的時候,網上的代碼都沒有很好的解決問題,所以在這里記錄一下。

  • 序列化和反序列化必需同步:如果使用原生的session來發送消息是沒有配置序列化的(應該也是可以設置自定義的序列化配置的),監聽類也需要用自帶的默認的序列化方式來接受對象。如果使用了jackson序列化方式,在配置類中,jmstemplate和監聽容器都要注入此配置,并且發送的時候需要用帶有jackson配置的jmstemplate來發送消息,否則消息會序列化失敗。
  • 生產者的事務:如果用jmstemplate來發送消息,跟讀源碼會發現,如果設置setSessionTransacted為true,則會代理生成一個事務,并且發送了會commit。在網上用模版批量發送事務消息的例子。于是就用的session會話來控制批量消息事務的控制。如果要用自定義序列化,則可以用“Message message = jmsTemplate.getMessageConverter().toMessage(book, session);”來構造一個含有jackson序列化配置的消息。
  • 消費者的事務和確認機制:當事務設置為true的時候,和生產者一樣,在配置確認機制就不生效了。因為默認就是自動確認。當發生異常事務回滾,會進行重試階段,重試后如果失敗,則會進入死信隊列。當事務設置為false的時候,一般設置的是手動確認,只有確認后的數據才會出隊,否則數據會一直存在隊列里面(排除定時的消息)。
  • 批量事務:用模版方法發送貌似只能一條條的來,批量的話得用session來發送,并commit;
  • topic的代碼還沒有實現,還在學習中,后續補上。。。
  • 總結

    以上是生活随笔為你收集整理的学习笔记:ActiveMQ + SpringBoot 事务问题 序列化问题的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。