javascript
Spring接入RabbitMQ
在原有的Spring項目中接入RabbitMQ,使用的Spring集成RabbitMQ,這樣 RabbitMQ的功能調用通過Spring封裝,調用更加簡潔。
首先pom 文件中引入依賴
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>${spring-rabbit-version}</version><exclusions><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-messaging</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-tx</artifactId></exclusion></exclusions></dependency>
因為已有項目中Spring的版本與引入的spring-rabbit包中依賴的版本不一致,故exclusions中設置了多條 exclusion。
引入 jar 包后,開始 RabbitMQ 的配置,思路等同于數據庫,要將程序與 RabbitMQ建立連接,以配置的形式。官網給出的 demo中,RabbitMQ接入 Spring 以xml 文件的形式;接入 SpringBoot以注解的形式。以下以xml 配置的形式。配置文件如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"><!-- 連接服務配置 --><!-- 消息未達到exchange, confirm callback and ack = false ; 消息達到 exchange, confirm callback and ack = true --><!-- 消息入 queue成功,no return callback ; 消息入 queue 失敗,return callback --><rabbit:connection-factory id="mqConnectionFactory" host="${rabbitMQ.host}" username="${rabbitMQ.username}" password="${rabbitMQ.password}" port="${rabbitMQ.port}" publisher-confirms="true" publisher-returns="true"/><rabbit:admin connection-factory="mqConnectionFactory"/><!-- 配置可用 rabbit 注解 --><!-- <rabbit:annotation-driven/><bean id="rabbitListenerContainerFactory"class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"><property name="connectionFactory" ref="mqConnectionFactory"/><property name="concurrentConsumers" value="3"/><property name="maxConcurrentConsumers" value="10"/></bean> --><rabbit:listener-container connection-factory="mqConnectionFactory" acknowledge="manual"><rabbit:listener queue-names="q.crm.msg" ref="msgSenderConsumerServiceImpl"/> </rabbit:listener-container><!-- exchange - rabbitTemplate 配置 多個名稱不同,注入使用 @Qualifier mandatory set true, return callback can success --><rabbit:template id="rabbitTemplate" connection-factory="mqConnectionFactory"confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true"/> </beans>將這個配置文件加載到 spring 的配置文件中,文件中的 host等通過 properties 文件設置。
生產者,消費者部分:
生產者:在需要使用的類中注入
@Autowired private RabbitTemplate rabbitTemplate;生產部分
rabbitTemplate.convertAndSend("e.crm", "k.crm.push.msg", JSON.toJSONString(msg));第一個參數(e.crm)指定 exchange,第二個參數(k.crm.push.msg)指定binding key,第三個參數為消息內容。
消費部分
@Service("msgSenderConsumerServiceImpl") public class MsgSenderConsumerServiceImpl implements ChannelAwareMessageListener {public void onMessage(Message message, Channel channel) throws IOException {String taskStr = new String(message.getBody(), "UTF-8");}實現的是ChannelAwareMessageListener接口,為在方法中使用Channel,不需要使用的話可直接實現MessageListener接口。
配置中生產者確認機制在配置中需要的類有:
@Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback{@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//exchange未到達成功 ack=false//處理邏輯 }} @Service("returnCallBackListener") public class ReturnCallBackListener implements ReturnCallback{@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//入隊列未成功,則調用此方法//處理邏輯 }}通過以上配置,RabbitMQ 接入完成。
但是哩,每個 queue的接入都需要修改配置文件,雖然解耦的方式,但是queue 接入后業務比較固定,修改配置文件就不如注解方式靈活。故接入將消費者修改成注解的方式。
放開 RabbitMQ配置中可使用注解的配置,這樣在類中可直接使用 RabbitMQ相關的注解。
@Service("msgSenderConsumerServiceImpl") @RabbitListener(queues = "q.crm.msg") public class MsgSenderConsumerServiceImpl {@RabbitHandlerpublic void onMessage(@Payload String message) throws IOException {System.out.println(message);}}這種形式則 RabbitMQ配置文件中的這部分就不需要了
<rabbit:listener-container connection-factory="mqConnectionFactory"><rabbit:listener queue-names="q.crm.msg" ref="msgSenderConsumerServiceImpl"/> </rabbit:listener-container>好,接下來,我們的業務想要在消費者端手動確認,
消費端通過 xml配置時比較好設置,如下:
<rabbit:listener-container connection-factory="mqConnectionFactory" acknowledge="manual"><rabbit:listener queue-names="q.crm.msg" ref="msgSenderConsumerServiceImpl"/> </rabbit:listener-container>但是哩,我們想要用注解的形式,從網上搜了各種資料,一直沒找到解決方案。
注解使用@RabbitListener時,需要在配置文件中指定rabbitListenerContainerFactory,但是手動確認的設置是在rabbitListenerContainer中,這樣就找不到設置的入口。
網上搜索的很多資料,通過設置:spring.rabbitmq.listener.simple.acknowledge-mode=manual
我將設置添加到 spring 的配置文件中,也是不好使,這個配置應該是 Spring-Boot 項目中有效的。
那直接設置的方案使用沒有找到可用的,最終的解決方案是,使用自己定義的rabbitListenerContainerFactory,
內容完全參照的spring-rabbit包中的SimpleRabbitListenerContainerFactory類,修改部分是:
@Overrideprotected SimpleMessageListenerContainer createContainerInstance() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}這樣手動確認就配置好了,手動確認消費者端的代碼為:
@Service("msgSenderConsumerServiceImpl") @RabbitListener(queues = "q.crm.msg") public class MsgSenderConsumerServiceImpl {@RabbitHandlerpublic void onMessage(@Payload String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {//處理邏輯channel.basicAck(deliveryTag, false);}以上~
?
轉載于:https://www.cnblogs.com/youyj/p/7332414.html
總結
以上是生活随笔為你收集整理的Spring接入RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微星350怎么u盘启动 微星350如何使
- 下一篇: javascript初级代码块