spring集成mq_使用Spring Integration Java DSL与Rabbit MQ集成
spring集成mq
我最近參加了在拉斯維加斯舉行的2016年Spring大會 ,很幸運地看到了我在軟件世界中長期敬佩的一些人。 我親自遇到了其中的兩個人,他們實際上合并了幾年前我與Spring Integration相關的一些次要貢獻– Gary Russel和Artem Bilan ,他們啟發了我重新審視我已經有一段時間沒有使用過的Spring Integration 。
我再一次想起了Spring Integration如何使任何復雜的Enterprise集成方案看起來都很簡單。 我很高興看到基于Spring Integration Java的DSL現在已經完全集成到Spring Integration傘和更高層次的抽象中,例如Spring Cloud Stream(感謝我的好朋友和這個項目的貢獻者,此介紹
Soby Chacko ),這使得某些消息驅動的場景更加容易。
在本文中,我只是在回顧與RabbitMQ的一個非常簡單的集成方案,在以后的文章中,將使用Spring Cloud Stream重新實現它。
考慮一個場景,其中兩個服務之間通過RabbitMQ代理相互通信,其中一個生成某種工作,另一種處理該工作。
制片人
可以使用Spring Integration Java DSL以以下方式在代碼中表示工作單元產生/分發部分:
@Configuration public class WorksOutbound {@Autowiredprivate RabbitConfig rabbitConfig;@Beanpublic IntegrationFlow toOutboundQueueFlow() {return IntegrationFlows.from("worksChannel").transform(Transformers.toJson()).handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate())).get();} }這是非常容易理解的-該流程首先從名為“ worksChannel”的通道中讀取一條消息,然后將該消息轉換為json,然后使用出站通道適配器將其分發給RabbitMQ交換。 現在,消息如何到達名為“ worksChannel”的通道-我已經通過Messaging網關(Spring Integration世界的入口)配置了消息-
@MessagingGateway public interface WorkUnitGateway {@Gateway(requestChannel = "worksChannel")void generate(WorkUnit workUnit);}因此,現在,如果Java客戶端想要向Rabbitmq派遣“工作單元”,則調用將如下所示:
WorkUnit sampleWorkUnit = new WorkUnit(UUID.randomUUID().toString(), definition); workUnitGateway.generate(sampleWorkUnit);我在這里刷了幾件事-特別是Rabbit MQ配置,該配置由工廠運行,但是可以在此處使用
消費者
沿著生產者的思路,消費者流程將從接收來自RabbitMQ隊列的消息開始,將其轉換為域模型,然后處理該消息,使用Spring Integration Java DSL通過以下方式表示:
@Configuration public class WorkInbound {@Autowiredprivate RabbitConfig rabbitConfig;@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic IntegrationFlow inboundFlow() {return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, rabbitConfig.worksQueue()).concurrentConsumers(3)).transform(Transformers.fromJson(WorkUnit.class)).handle("workHandler", "process").get();} }代碼應該很直觀,上面的workHandler是一個簡單的Java pojo,看起來像這樣,完成了僅記錄有效負載的非常重要的工作:
@Service public class WorkHandler {private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);public void process(WorkUnit workUnit) {LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());} } 本質上就是這樣,如果使用直接的Java和原始RabbitMQ庫嘗試使用Spring Integration,它將提供相當出色的代碼外觀。
Spring Cloud Stream使整個設置更加簡單,并且將成為以后的主題。
如果您有興趣嘗試一下,我已將整個代碼發布在我的github倉庫中 。
翻譯自: https://www.javacodegeeks.com/2016/08/integrating-rabbit-mq-using-spring-integration-java-dsl.html
spring集成mq
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的spring集成mq_使用Spring Integration Java DSL与Rabbit MQ集成的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 虚拟内存linux(虚拟内存 linux
- 下一篇: java向后兼容吗_Java向后不兼容历