日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 前端技术 > javascript >内容正文

javascript

使用Spring Cloud Stream与RabbitMQ集成

發(fā)布時(shí)間:2023/12/3 javascript 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Spring Cloud Stream与RabbitMQ集成 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在我以前的文章中,我寫了兩個(gè)系統(tǒng)之間非常簡(jiǎn)單的集成場(chǎng)景-一個(gè)生成一個(gè)工作單元,另一個(gè)處理該工作單元,以及Spring Integration如何使這種集成非常容易。

在這里,我將演示如何使用Spring Cloud Stream進(jìn)一步簡(jiǎn)化此集成方案

我在這里有示例代碼– pom.xml中提供了適用于Spring Cloud Stream的正確maven依賴關(guān)系。

制片人

因此,再次從負(fù)責(zé)生成工作單元的生產(chǎn)者開始。 將消息發(fā)送到RabbitMQ所需的代碼明智的全部工作就是按照以下方式進(jìn)行Java配置:

@Configuration @EnableBinding(WorkUnitsSource.class) @IntegrationComponentScan public class IntegrationConfiguration {}

從表面上看,這看似簡(jiǎn)單,但在幕后做了很多事情,據(jù)我了解并從文檔中了解到,這些是該配置觸發(fā)的:

1.創(chuàng)建基于綁定到@EnableBinding批注的類的Spring Integration消息通道。 上面的WorkUnitsSource類是一個(gè)稱為“ worksChannel”的自定義通道的定義,如下所示:

import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;public interface WorkUnitsSource {String CHANNEL_NAME = "worksChannel";@OutputMessageChannel worksChannel();}

2.根據(jù)運(yùn)行時(shí)可用的“綁定程序”實(shí)現(xiàn)(例如RabbitMQ,Kaffka,Redis,Gemfire),上一步中的通道將連接到系統(tǒng)中的適當(dāng)結(jié)構(gòu)–因此,例如,我希望我的“ worksChannel”依次發(fā)送消息到RabbitMQ,Spring Cloud Stream將負(fù)責(zé)在RabbitMQ中自動(dòng)創(chuàng)建主題交換

我希望就數(shù)據(jù)如何發(fā)送到RabbitMQ進(jìn)行一些進(jìn)一步的自定義-特別是我希望域?qū)ο笤诎l(fā)送之前先序列化為json,并且我想指定將有效負(fù)載發(fā)送到的RabbitMQ交換的名稱。由某些配置控制,這些配置可以使用yaml文件以以下方式附加到通道:

spring:cloud:stream:bindings:worksChannel:destination: work.exchangecontentType: application/jsongroup: testgroup

最后一個(gè)細(xì)節(jié)是應(yīng)用程序其余部分與Spring Cloud Stream交互的方式,可以通過定義消息網(wǎng)關(guān)直接在Spring Integration中完成:

import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import works.service.domain.WorkUnit;@MessagingGateway public interface WorkUnitGateway {@Gateway(requestChannel = WorkUnitsSource.CHANNEL_NAME)void generate(WorkUnit workUnit);}

基本上就是這樣,Spring Cloud Stream現(xiàn)在將連接整個(gè)Spring集成流程,并在RabbitMQ中創(chuàng)建適當(dāng)?shù)慕Y(jié)構(gòu)。

消費(fèi)者

與生產(chǎn)者類似,首先我想定義一個(gè)名為“ worksChannel”的通道,該通道將處理來自RabbitMQ的傳入消息:

import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel;public interface WorkUnitsSink {String CHANNEL_NAME = "worksChannel";@InputSubscribableChannel worksChannel(); }

然后讓Spring Cloud Stream根據(jù)此定義創(chuàng)建通道和RabbitMQ綁定:

import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.context.annotation.Configuration;@Configuration @EnableBinding(WorkUnitsSink.class) public class IntegrationConfiguration {}

為了處理消息,Spring Cloud Stream提供了一個(gè)偵聽器,可以通過以下方式創(chuàng)建它:

@Service public class WorkHandler {private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);@StreamListener(WorkUnitsSink.CHANNEL_NAME)public void process(WorkUnit workUnit) {LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());} }

最后是將這個(gè)通道連接到y(tǒng)aml文件中表示的RabbitMQ基礎(chǔ)結(jié)構(gòu)的配置:

spring:cloud:stream:bindings:worksChannel:destination: work.exchangegroup: testgroup

現(xiàn)在,如果啟動(dòng)了生產(chǎn)者和任何數(shù)量的使用者,則通過生產(chǎn)者發(fā)送的消息將作為json發(fā)送到Rabbit MQ主題交換,由使用者檢索,反序列化為對(duì)象并傳遞給工作處理器。

現(xiàn)在,純粹由Spring Cloud Stream庫(kù)按照慣例處理創(chuàng)建RabbitMQ基礎(chǔ)結(jié)構(gòu)所涉及的大量樣板。 盡管Spring Cloud Stream嘗試提供原始Spring Integration的基礎(chǔ),但是掌握Spring Integration的基本知識(shí)以有效使用Spring Cloud Stream還是很有用的。

此處描述的示例可在我的github存儲(chǔ)庫(kù)中找到

翻譯自: https://www.javacodegeeks.com/2016/08/integrating-rabbitmq-using-spring-cloud-stream.html

總結(jié)

以上是生活随笔為你收集整理的使用Spring Cloud Stream与RabbitMQ集成的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。