日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring cloud集成Rabbitmq

發布時間:2024/4/13 javascript 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring cloud集成Rabbitmq 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、配置pom

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.0.RELEASE</version> </dependency>

2、yml配置

spring:
? ?#rabbit mq
? ?rabbitmq:

? ? ?host: 192.168.1.146
? ? ?port: 5672
? ? ?username: guest
? ? ?password: guest

3、連接、交換器、隊列等設置

@Configuration public class RabbitMqConfig {/*** 組件發布EXCHANGE*/public static final String EXCHANGE_COMPONENT_PUBLISHED="exchange.component.published";public static final String EXCHANGE_COMPONENT_SYNCED="exchange.component.synced";public static final String EXCHANGE_EXTRACTION_TASK="exchange.extraction.task";/*** 組件發布消息隊列*/public static final String QUEUE_COMPONENT_PUBLISHED="queue.component.published";public static final String QUEUE_COMPONENT_SYNCED="queue.component.synced";public static final String QUEUE_EXTRACTION_TASK="queue.extraction.task";public static final String ROUTING_KEY_="";@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true);return connectionFactory;}@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}/**** @return*/@Beanpublic FanoutExchange exchangeComponentPublished(){return new FanoutExchange(EXCHANGE_COMPONENT_PUBLISHED);}@Beanpublic Queue queueComponentPublished(){return new Queue(QUEUE_COMPONENT_PUBLISHED,false);}@Beanpublic FanoutExchange exchangeComponentSynced(){return new FanoutExchange(EXCHANGE_COMPONENT_SYNCED);}@Beanpublic FanoutExchange exchangeExtractionTask(){return new FanoutExchange(EXCHANGE_EXTRACTION_TASK);}@Beanpublic Queue queueComponentSynced(){return new Queue(QUEUE_COMPONENT_SYNCED,false);}@Beanpublic Queue queueExtractionTask(){return new Queue(QUEUE_EXTRACTION_TASK,false);}@Beanpublic Binding componentSyncedBinding(){return BindingBuilder.bind(queueComponentSynced()).to(exchangeComponentSynced());}@Beanpublic Binding extractionTaskBinding(){return BindingBuilder.bind(queueExtractionTask()).to(exchangeExtractionTask());}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory){RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);return rabbitTemplate;}}

需要定義交換器,隊列,綁定,會自動注冊。

4、生產者

@Component public class ExtractionTaskMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(ExtractionTaskMessage msg){rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());ObjectMapper mapper = new ObjectMapper();Message m = null;try {m = MessageBuilder.withBody(mapper.writeValueAsBytes(msg)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();} catch (JsonProcessingException e) {e.printStackTrace();}rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_EXTRACTION_TASK,"",msg);} }

使用RabbitTemplate發送消息。

5、消費者

@Component @RabbitListener(queues = RabbitMqConfig.QUEUE_EXTRACTION_TASK) public class ExtractionTaskMessageReceiver {private static Logger logger = LoggerFactory.getLogger(ExtractionTaskMessageReceiver.class);@Autowiredprivate DataExtractorFactory dataExtractorFactory;@Autowiredprivate ExtractionTaskService extractionTaskService;@Autowiredprivate ExtractionDataService extractionDataService;@RabbitHandlerpublic void process(@Payload ExtractionTaskMessage msg) {try {} catch (DataExtractionException e){logger.error( "",e.getMessage());}catch (Exception e) {//異常處理logger.error(e.getMessage());}}} 使用 @RabbitListener 進行監聽,@RabbitHandler定義消息處理方法。

在進行監聽時,會查詢所有@RabbitHandler注解的消息處理方法,如果沒有參數類型匹配的方法,則異常。

@RabbitListener 注意

  • 消息處理方法參數是由 MessageConverter 轉化,若使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實例中去設置(默認 Spring 使用的實現是 SimpleRabbitListenerContainerFactory)

  • 消息的 content_type 屬性表示消息 body 數據以什么數據格式存儲,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應類型接收消息 body 內容,但若方法參數類型不正確會拋異常:

    • application/octet-stream:二進制字節數組存儲,使用 byte[]
    • application/x-java-serialized-object:java 對象序列化格式存儲。使用 Object、相應類型(反序列化時類型應該同包同名,否者會拋出找不到類異常)
    • text/plain:文本數據類型存儲。使用 String
    • application/json:JSON 格式。使用 Object、相應類型

注意:@RabbitListener注解在類上或方法上,行為不一樣。在類上,消費方法參數類型不可以設置為Message。在方法上,方法類型可以為Message

MessageConvert

  • 涉及網絡傳輸的應用序列化不可避免,發送端以某種規則將消息轉成 byte 數組進行發送,接收端則以約定的規則進行 byte[] 數組的解析
  • RabbitMQ 的序列化是指 Message 的 body 屬性,即我們真正需要傳輸的內容,RabbitMQ 抽象出一個 MessageConvert 接口處理消息的序列化,其實現有 SimpleMessageConverter(默認)、Jackson2JsonMessageConverter 等
  • 當調用了 convertAndSend 方法時會使用 MessageConvert 進行消息的序列化
  • SimpleMessageConverter 對于要發送的消息體 body 為 byte[] 時不進行處理,如果是 String 則轉成字節數組,如果是 Java 對象,則使用 jdk 序列化將消息轉成字節數組,轉出來的結果較大,含class類名,類相應方法等信息。因此性能較差
  • 當使用 RabbitMQ 作為中間件時,數據量比較大,此時就要考慮使用類似 Jackson2JsonMessageConverter 等序列化形式以此提高性能

設置MessageConvert

Json格式

public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}

?自定義

public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new MessageConverter() {@Overridepublic Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {return null;}@Overridepublic Object fromMessage(Message message) throws MessageConversionException {try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))){return (User)ois.readObject();}catch (Exception e){e.printStackTrace();return null;}}});return factory;}

@Payload 與 @Headers

  • 使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
? ? System.out.println("body:"+body);
? ? System.out.println("Headers:"+headers);
}

  • 也可以獲取單個 Header 屬性

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
? ? System.out.println("body:"+body);
? ? System.out.println("token:"+token);
}
?

通過 @RabbitListener 注解聲明 Binding

  • 通過 @RabbitListener 的 bindings 屬性聲明 Binding(若 RabbitMQ 中不存在該綁定所需要的 Queue、Exchange、RouteKey 則自動創建,若存在則拋出異常)
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),value = @Queue(value = "consumer_queue",durable = "true"),key = "key.#" )) public void processMessage1(Message message) {System.out.println(message); }

@RabbitListener 和 @RabbitHandler 搭配使用

  • @RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用
  • @RabbitListener 標注在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,根據 MessageConverter 轉換后的參數類型
@Component @RabbitListener(queues = "consumer_queue") public class Receiver {@RabbitHandlerpublic void processMessage1(String message) {System.out.println(message);}@RabbitHandlerpublic void processMessage2(byte[] message) {System.out.println(new String(message));}}

?

?

?

總結

以上是生活随笔為你收集整理的Spring cloud集成Rabbitmq的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。