javascript
使用Spring Cloud Stream与RabbitMQ集成
在我以前的文章中,我寫了兩個(gè)系統(tǒng)之間非常簡(jiǎn)單的集成場(chǎng)景-一個(gè)生成一個(gè)工作單元,另一個(gè)處理該工作單元,以及Spring Integration如何使這種集成非常容易。
在這里,我將演示如何使用Spring Cloud Stream進(jìn)一步簡(jiǎn)化此集成方案
我在這里有示例代碼– pom.xml中提供了適用于Spring Cloud Stream的正確maven依賴關(guān)系。
制片人
因此,再次從負(fù)責(zé)生成工作單元的生產(chǎn)者開始。 將消息發(fā)送到RabbitMQ所需的代碼明智的全部工作就是按照以下方式進(jìn)行Java配置:
@Configuration @EnableBinding(WorkUnitsSource.class) @IntegrationComponentScan public class IntegrationConfiguration {}從表面上看,這看似簡(jiǎn)單,但在幕后做了很多事情,據(jù)我了解并從文檔中了解到,這些是該配置觸發(fā)的:
1.創(chuàng)建基于綁定到@EnableBinding批注的類的Spring Integration消息通道。 上面的WorkUnitsSource類是一個(gè)稱為“ worksChannel”的自定義通道的定義,如下所示:
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;public interface WorkUnitsSource {String CHANNEL_NAME = "worksChannel";@OutputMessageChannel worksChannel();}2.根據(jù)運(yùn)行時(shí)可用的“綁定程序”實(shí)現(xiàn)(例如RabbitMQ,Kaffka,Redis,Gemfire),上一步中的通道將連接到系統(tǒng)中的適當(dāng)結(jié)構(gòu)–因此,例如,我希望我的“ worksChannel”依次發(fā)送消息到RabbitMQ,Spring Cloud Stream將負(fù)責(zé)在RabbitMQ中自動(dòng)創(chuàng)建主題交換
我希望就數(shù)據(jù)如何發(fā)送到RabbitMQ進(jìn)行一些進(jìn)一步的自定義-特別是我希望域?qū)ο笤诎l(fā)送之前先序列化為json,并且我想指定將有效負(fù)載發(fā)送到的RabbitMQ交換的名稱。由某些配置控制,這些配置可以使用yaml文件以以下方式附加到通道:
spring:cloud:stream:bindings:worksChannel:destination: work.exchangecontentType: application/jsongroup: testgroup最后一個(gè)細(xì)節(jié)是應(yīng)用程序其余部分與Spring Cloud Stream交互的方式,可以通過定義消息網(wǎng)關(guān)直接在Spring Integration中完成:
import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import works.service.domain.WorkUnit;@MessagingGateway public interface WorkUnitGateway {@Gateway(requestChannel = WorkUnitsSource.CHANNEL_NAME)void generate(WorkUnit workUnit);}基本上就是這樣,Spring Cloud Stream現(xiàn)在將連接整個(gè)Spring集成流程,并在RabbitMQ中創(chuàng)建適當(dāng)?shù)慕Y(jié)構(gòu)。
消費(fèi)者
與生產(chǎn)者類似,首先我想定義一個(gè)名為“ worksChannel”的通道,該通道將處理來自RabbitMQ的傳入消息:
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel;public interface WorkUnitsSink {String CHANNEL_NAME = "worksChannel";@InputSubscribableChannel worksChannel(); }然后讓Spring Cloud Stream根據(jù)此定義創(chuàng)建通道和RabbitMQ綁定:
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.context.annotation.Configuration;@Configuration @EnableBinding(WorkUnitsSink.class) public class IntegrationConfiguration {}為了處理消息,Spring Cloud Stream提供了一個(gè)偵聽器,可以通過以下方式創(chuàng)建它:
@Service public class WorkHandler {private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);@StreamListener(WorkUnitsSink.CHANNEL_NAME)public void process(WorkUnit workUnit) {LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());} }最后是將這個(gè)通道連接到y(tǒng)aml文件中表示的RabbitMQ基礎(chǔ)結(jié)構(gòu)的配置:
spring:cloud:stream:bindings:worksChannel:destination: work.exchangegroup: testgroup現(xiàn)在,如果啟動(dòng)了生產(chǎn)者和任何數(shù)量的使用者,則通過生產(chǎn)者發(fā)送的消息將作為json發(fā)送到Rabbit MQ主題交換,由使用者檢索,反序列化為對(duì)象并傳遞給工作處理器。
現(xiàn)在,純粹由Spring Cloud Stream庫(kù)按照慣例處理創(chuàng)建RabbitMQ基礎(chǔ)結(jié)構(gòu)所涉及的大量樣板。 盡管Spring Cloud Stream嘗試提供原始Spring Integration的基礎(chǔ),但是掌握Spring Integration的基本知識(shí)以有效使用Spring Cloud Stream還是很有用的。
此處描述的示例可在我的github存儲(chǔ)庫(kù)中找到
翻譯自: https://www.javacodegeeks.com/2016/08/integrating-rabbitmq-using-spring-cloud-stream.html
總結(jié)
以上是生活随笔為你收集整理的使用Spring Cloud Stream与RabbitMQ集成的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓设置按钮背景颜色(安卓设置按钮)
- 下一篇: Spring Batch:多种格式输出编