日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spring整合rabbitMQ最新版

發布時間:2024/9/27 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spring整合rabbitMQ最新版 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

          • 一、簡單對象
            • 1. 依賴
            • 2. 生產者
            • 3. 消費者
            • 4. 配置文件
            • 5. spring版本
          • 二、復雜對象
            • 2.1. 生產者
            • 2.2. 消費者

一、簡單對象
1. 依賴
<!--spring整合rabbitmq--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency>

注:maven方式,這一個依賴即可,如果是非maven項目,需要引入5個jar如下:

推薦使用mavne方式,簡單,非Maven項目,先用maven把以來下載本地倉庫,復制到非maven的項目中即可。

2. 生產者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--生產者者配置如下:--><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.vhost}"connection-timeout="${rabbitmq.conTimeout}"publisher-confirms="${rabbitmq.publisher-confirms}"publisher-returns="${rabbitmq.publisher-returns}"/><!-- 管理消息隊列 --><rabbit:admin connection-factory="connectionFactory"/><!--此處為配置文件方式 管控臺配置模式需要注釋 默認模式管控臺 Start--><!-- 定義一個隊列或者多個隊列 自動聲明--><rabbit:queue name="Queue-1" auto-declare="true" durable="true"/><rabbit:topic-exchange name="exchange-1"><rabbit:bindings><!-- 可綁定多個隊列,發送的時候指定key進行發送 --><rabbit:binding queue="Queue-1" pattern="ws.tjqb"/></rabbit:bindings></rabbit:topic-exchange><!--此處為配置文件方式 管控臺配置模式需要注釋 默認模式管控臺 End--><!-- 定義交換機 自動聲明--><rabbit:topic-exchange name="exchange-1"auto-declare="true" durable="true"/><!-- 5. 配置消息對象json轉換類 --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!-- 定義MQ消息模板1. id : 定義消息模板ID2.connection-factory : 把定義的連接工廠放到消息模板中3.confirm-callback : confirm確認機制4.return-callback : return確認機制5.mandatory :#有2種狀態設置為 true 后 消費者在消息沒有被路由到合適隊列情況下會被return監聽,而不會自動刪除;設置為 false 后 消費者在消息沒有被路由到合適隊列情況下會自動刪除--><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="exchange-1"confirm-callback="confirmCallBackListener"return-callback="returnCallBackListener"mandatory="true"message-converter="jsonMessageConverter"/> </beans> package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID;@RestController public class Send {public static final String EXCHANGE = "exchange-1";@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/test")public String test() {String uuidStr = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuidStr);// 發送消息Map<String, String> map = new HashMap<>();map.put("email", "550731230@qq.com");rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);return "success";}
3. 消費者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--消費者配置如下:--><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"connection-timeout="${rabbitmq.conTimeout}"publisher-confirms="${rabbitmq.publisher-confirms}"publisher-returns="${rabbitmq.publisher-returns}"/><!-- 管理消息隊列 --><rabbit:admin connection-factory="connectionFactory"/><!-- 聲明多個消費者對象 --><bean id="emailMessageListener" class="com.gblfy.order.mqhandler.EmailMessageListener"/><!-- 監聽隊列1. connectionFactory 連接工廠2. manual 手動簽收3. ref="" 消費者監聽--><rabbit:listener-container connection-factory="connectionFactory"acknowledge="manual"concurrency="${rabbitmq.concurrency}"max-concurrency="${rabbitmq.max-concurrency}"><rabbit:listener ref="emailMessageListener" method="onMessage" queue-names="Queue-1"/></rabbit:listener-container> </beans> package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener;import java.io.IOException; @Component public class EmailMessageListener implements MessageListener {private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message) {try {JsonNode jsonNode = MAPPER.readTree(message.getBody());String email = jsonNode.get("email").asText();System.out.println("獲取隊列中消息:" + email);} catch (IOException e) {e.printStackTrace();}} }
4. 配置文件
#RabbitMQ 連接信息 #IP地址 rabbitmq.host=192.168.0.114 #端口 rabbitmq.port=5672 #用戶名 rabbitmq.username=fis #密碼 rabbitmq.password=ncl@1234 #虛擬主機 rabbitmq.vhost=/app/fisMQ #連接超時時間 rabbitmq.conTimeout=15000 #發送確認 對應RabbitTemplate.ConfirmCallback接口 #消息發送成功 有2個重要參數 # ack 狀態為true correlationId 全局唯一ID用于標識每一支隊列 rabbitmq.publisher-confirms=true #發送失敗回退,對應RabbitTemplate.ReturnCallback接口 rabbitmq.publisher-returns=true #默認消費者數量 rabbitmq.concurrency=10 #最大消費者數量 rabbitmq.max-concurrency=20
5. spring版本

目前適配的spring版本4.2.3.RELEASE

二、復雜對象

聲明:配置文件不變

2.1. 生產者
package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID;@RestController public class Send {public static final String EXCHANGE = "exchange-1";@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/test")public String test() {FisCallingTrace f = getFisCallingTrace();String uuidStr = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuidStr);// 發送消息Map<String, Object> map = new HashMap<>();map.put("mReqXml", "請求報文");map.put("mResXml", "響應報文");map.put("mUUID", uuidStr);map.put("serviceName", "NYHC");map.put("fisCallingTrace", f);rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);return "success";}// @RequestMapping("/test")// public String test() {// String uuidStr = UUID.randomUUID().toString();// CorrelationData correlationId = new CorrelationData(uuidStr);// // 發送消息// Map<String, String> map = new HashMap<>();// map.put("email", "550731230@qq.com");// rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);// return "success";// }private FisCallingTrace getFisCallingTrace() {FisCallingTrace f = new FisCallingTrace();f.setServicename("tjqb");f.setServicetype("1");f.setInterfacetype("2");f.setResstatus("1");f.setResremark("紐約數據回傳接口");f.setReqdate(new Date());f.setReqtime("10:00:00");f.setResdate(new Date());f.setRestime("10:00:00");f.setReqxml("請求報文");f.setResxml("響應報文");return f;}}
2.2. 消費者
package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.gblfy.order.pojo.FisCallingTrace; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component;import java.io.IOException;@Slf4j @Component public class EmailMessageListener implements ChannelAwareMessageListener {private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try {JsonNode jsonNode = MAPPER.readTree(message.getBody());String mReqXml = jsonNode.get("mReqXml").asText();String mResXml = jsonNode.get("mResXml").asText();String mUUID = jsonNode.get("mUUID").asText();String serviceName = jsonNode.get("serviceName").asText();System.out.println("獲取隊列中消息:" + mReqXml);System.out.println("獲取隊列中消息:" + mResXml);System.out.println("獲取隊列中消息:" + mUUID);System.out.println("獲取隊列中消息:" + serviceName);JsonNode jsonNode1 = jsonNode.get("fisCallingTrace");String jsonStr = MAPPER.writeValueAsString(jsonNode1);FisCallingTrace f= MAPPER.readValue(jsonStr , FisCallingTrace.class);System.out.println("獲取隊列中消息:" + f.getReqxml());System.out.println("獲取隊列中消息:" + f.getResxml());// 消息的標識,false只確認當前一個消息收到,true確認所有consumer獲得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {e.printStackTrace();}log.info("解析操作");log.info("落庫操作");} }

總結

以上是生活随笔為你收集整理的spring整合rabbitMQ最新版的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。