日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring接入RabbitMQ

發布時間:2024/10/12 javascript 76 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring接入RabbitMQ 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  在原有的Spring項目中接入RabbitMQ,使用的Spring集成RabbitMQ,這樣 RabbitMQ的功能調用通過Spring封裝,調用更加簡潔。

  首先pom 文件中引入依賴

  

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>${spring-rabbit-version}</version><exclusions><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-messaging</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-tx</artifactId></exclusion></exclusions></dependency>

因為已有項目中Spring的版本與引入的spring-rabbit包中依賴的版本不一致,故exclusions中設置了多條 exclusion。

引入 jar 包后,開始 RabbitMQ 的配置,思路等同于數據庫,要將程序與 RabbitMQ建立連接,以配置的形式。官網給出的 demo中,RabbitMQ接入 Spring 以xml 文件的形式;接入 SpringBoot以注解的形式。以下以xml 配置的形式。配置文件如下:

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"><!-- 連接服務配置 --><!-- 消息未達到exchange, confirm callback and ack = false ; 消息達到 exchange, confirm callback and ack = true --><!-- 消息入 queue成功,no return callback ; 消息入 queue 失敗,return callback --><rabbit:connection-factory id="mqConnectionFactory" host="${rabbitMQ.host}" username="${rabbitMQ.username}" password="${rabbitMQ.password}" port="${rabbitMQ.port}" publisher-confirms="true" publisher-returns="true"/><rabbit:admin connection-factory="mqConnectionFactory"/><!-- 配置可用 rabbit 注解 --><!-- <rabbit:annotation-driven/><bean id="rabbitListenerContainerFactory"class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"><property name="connectionFactory" ref="mqConnectionFactory"/><property name="concurrentConsumers" value="3"/><property name="maxConcurrentConsumers" value="10"/></bean> --><rabbit:listener-container connection-factory="mqConnectionFactory" acknowledge="manual"><rabbit:listener queue-names="q.crm.msg" ref="msgSenderConsumerServiceImpl"/> </rabbit:listener-container><!-- exchange - rabbitTemplate 配置 多個名稱不同,注入使用 @Qualifier mandatory set true, return callback can success --><rabbit:template id="rabbitTemplate" connection-factory="mqConnectionFactory"confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true"/> </beans>

將這個配置文件加載到 spring 的配置文件中,文件中的 host等通過 properties 文件設置。

生產者,消費者部分:

生產者:在需要使用的類中注入

@Autowired private RabbitTemplate rabbitTemplate;

生產部分

rabbitTemplate.convertAndSend("e.crm", "k.crm.push.msg", JSON.toJSONString(msg));

第一個參數(e.crm)指定 exchange,第二個參數(k.crm.push.msg)指定binding key,第三個參數為消息內容。

消費部分

@Service("msgSenderConsumerServiceImpl") public class MsgSenderConsumerServiceImpl implements ChannelAwareMessageListener {public void onMessage(Message message, Channel channel) throws IOException {String taskStr = new String(message.getBody(), "UTF-8");}

實現的是ChannelAwareMessageListener接口,為在方法中使用Channel,不需要使用的話可直接實現MessageListener接口。

配置中生產者確認機制在配置中需要的類有:

@Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback{@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//exchange未到達成功 ack=false//處理邏輯 }} @Service("returnCallBackListener") public class ReturnCallBackListener implements ReturnCallback{@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//入隊列未成功,則調用此方法//處理邏輯 }}

通過以上配置,RabbitMQ 接入完成。

但是哩,每個 queue的接入都需要修改配置文件,雖然解耦的方式,但是queue 接入后業務比較固定,修改配置文件就不如注解方式靈活。故接入將消費者修改成注解的方式。

放開 RabbitMQ配置中可使用注解的配置,這樣在類中可直接使用 RabbitMQ相關的注解。

@Service("msgSenderConsumerServiceImpl") @RabbitListener(queues = "q.crm.msg") public class MsgSenderConsumerServiceImpl {@RabbitHandlerpublic void onMessage(@Payload String message) throws IOException {System.out.println(message);}}

這種形式則 RabbitMQ配置文件中的這部分就不需要了

<rabbit:listener-container connection-factory="mqConnectionFactory"><rabbit:listener queue-names="q.crm.msg" ref="msgSenderConsumerServiceImpl"/> </rabbit:listener-container>

好,接下來,我們的業務想要在消費者端手動確認,

消費端通過 xml配置時比較好設置,如下:

<rabbit:listener-container connection-factory="mqConnectionFactory" acknowledge="manual"><rabbit:listener queue-names="q.crm.msg" ref="msgSenderConsumerServiceImpl"/> </rabbit:listener-container>

但是哩,我們想要用注解的形式,從網上搜了各種資料,一直沒找到解決方案。

注解使用@RabbitListener時,需要在配置文件中指定rabbitListenerContainerFactory,但是手動確認的設置是在rabbitListenerContainer中,這樣就找不到設置的入口。

網上搜索的很多資料,通過設置:spring.rabbitmq.listener.simple.acknowledge-mode=manual

我將設置添加到 spring 的配置文件中,也是不好使,這個配置應該是 Spring-Boot 項目中有效的。

那直接設置的方案使用沒有找到可用的,最終的解決方案是,使用自己定義的rabbitListenerContainerFactory,

內容完全參照的spring-rabbit包中的SimpleRabbitListenerContainerFactory類,修改部分是:

@Overrideprotected SimpleMessageListenerContainer createContainerInstance() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}

這樣手動確認就配置好了,手動確認消費者端的代碼為:

@Service("msgSenderConsumerServiceImpl") @RabbitListener(queues = "q.crm.msg") public class MsgSenderConsumerServiceImpl {@RabbitHandlerpublic void onMessage(@Payload String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {//處理邏輯channel.basicAck(deliveryTag, false);}

以上~

?

轉載于:https://www.cnblogs.com/youyj/p/7332414.html

總結

以上是生活随笔為你收集整理的Spring接入RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。