javascript
RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?
作者 | RocketMQ 官微
來源|阿里巴巴云原生公眾號(hào)
2019 年 1 月,孵化 6 個(gè)月的 RocketMQ-Spring 作為 Apache RocketMQ 的子項(xiàng)目正式畢業(yè),發(fā)布了第一個(gè) Release 版本 2.0.1。該項(xiàng)目是把 RocketMQ 的客戶端使用 Spring Boot 的方式進(jìn)行了封裝,可以讓用戶通過簡(jiǎn)單的 annotation 和標(biāo)準(zhǔn)的 Spring Messaging API 編寫代碼來進(jìn)行消息的發(fā)送和消費(fèi)。當(dāng)時(shí) RocketMQ 社區(qū)同學(xué)請(qǐng) Spring 社區(qū)的同學(xué)對(duì) RocketMQ-Spring 代碼進(jìn)行 review,引出一段羅美琪(RocketMQ)和春波特(Spring Boot)的故事。
時(shí)隔兩年,RocketMQ-Spring 正式發(fā)布 2.2.0。在這期間,RocketMQ-Spring 迭代了數(shù)個(gè)版本,以 RocketMQ-Spring 為基礎(chǔ)實(shí)現(xiàn)的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了 Spring 的官網(wǎng),Spring 布道師 baeldung 向國(guó)外同學(xué)介紹如何使用 RocketMQ-Spring,越來越多國(guó)內(nèi)外的同學(xué)開始使用 RocketMQ-Spring 收發(fā)消息,RocketMQ-Spring 倉庫的 star 數(shù)也在短短兩年時(shí)間內(nèi)超越了 Spring-Kafka 和 Spring-AMQP(注:兩者均由 Spring 社區(qū)維護(hù)),成為 Apache RocketMQ 最受歡迎的生態(tài)項(xiàng)目之一。
RocketMQ-Spring 的受歡迎一方面得益于支持豐富業(yè)務(wù)場(chǎng)景的 RocketMQ 與微服務(wù)生態(tài) Spring 的完美契合,另一方面也與 RocketMQ-Spring 本身嚴(yán)格遵循 Spring Messaging API 規(guī)范,支持豐富的消息類型分不開。
遵循 Spring Messaging API 規(guī)范
Spring Messaging 提供了一套抽象的 API,對(duì)消息發(fā)送端和消息接收端的模式進(jìn)行規(guī)定,不同的消息中間件提供商可以在這個(gè)模式下提供自己的 Spring 實(shí)現(xiàn):在消息發(fā)送端需要實(shí)現(xiàn)的是一個(gè) XXXTemplate 形式的 Java Bean,結(jié)合 Spring Boot 的自動(dòng)化配置選項(xiàng)提供多個(gè)不同的發(fā)送消息方法;在消息的消費(fèi)端是一個(gè) XXXMessageListener 接口(實(shí)現(xiàn)方式通常會(huì)使用一個(gè)注解來聲明一個(gè)消息驅(qū)動(dòng)的 POJO),提供回調(diào)方法來監(jiān)聽和消費(fèi)消息,這個(gè)接口同樣可以使用 Spring Boot 的自動(dòng)化選項(xiàng)和一些定制化的屬性。
1. 發(fā)送端
RocketMQ-Spring 在遵循 Spring Messaging API 規(guī)范的基礎(chǔ)上結(jié)合 RocketMQ 自身的功能特點(diǎn)提供了相應(yīng)的 API。在消息的發(fā)送端,RocketMQ-Spring 通過實(shí)現(xiàn) RocketMQTemplate 完成消息的發(fā)送。如下圖所示,RocketMQTemplate 繼承 AbstractMessageSendingTemplate 抽象類,來支持 Spring Messaging API 標(biāo)準(zhǔn)的消息轉(zhuǎn)換和發(fā)送方法,這些方法最終會(huì)代理給 doSend 方法,doSend 方法會(huì)最終調(diào)用 syncSend,由 DefaultMQProducer 實(shí)現(xiàn)。
除 Spring Messaging API 規(guī)范中的方法,RocketMQTemplate 還實(shí)現(xiàn)了 RocketMQ 原生客戶端的一些方法,來支持更加豐富的消息類型。值得注意的是,相比于原生客戶端需要自己去構(gòu)建 RocketMQ Message(比如將對(duì)象序列化成 byte 數(shù)組放入 Message 對(duì)象),RocketMQTemplate 可以直接將對(duì)象、字符串或者 byte 數(shù)組作為參數(shù)發(fā)送出去(對(duì)象序列化操作由 RocketMQ-Spring 內(nèi)置完成),在消費(fèi)端約定好對(duì)應(yīng)的 Schema 即可正常收發(fā)。
RocketMQTemplate Send API:SendResult syncSend(String destination, Object payload) SendResult syncSend(String destination, Message<?> message)void asyncSend(String destination, Message<?> message, SendCallback sendCallback)void asyncSend(String destination, Message<?> message, SendCallback sendCallback)……2. 消費(fèi)端
在消費(fèi)端,需要實(shí)現(xiàn)一個(gè)包含 @RocketMQMessageListener 注解的類(需要實(shí)現(xiàn) RocketMQListener 接口,并實(shí)現(xiàn) onMessage 方法,在注解中進(jìn)行 topic、consumerGroup 等屬性配置),這個(gè) Listener 會(huì)一對(duì)一的被放置到 DefaultRocketMQListenerContainer 容器對(duì)象中,容器對(duì)象會(huì)根據(jù)消費(fèi)的方式(并發(fā)或順序),將 RocketMQListener 封裝到具體的 RocketMQ 內(nèi)部的并發(fā)或者順序接口實(shí)現(xiàn)。在容器中創(chuàng)建 RocketMQ DefaultPushConsumer 對(duì)象,啟動(dòng)并監(jiān)聽定制的 Topic 消息,完成約定 Schema 對(duì)象的轉(zhuǎn)換,回調(diào)到 Listener 的 onMessage 方法。
@Service @RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}") public class StringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.printf("------- StringConsumer received: %s \n", message);} }除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQ-Spring 實(shí)現(xiàn)了 RocketMQ Lite Pull Consumer。通過在配置文件中進(jìn)行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 方法即可主動(dòng) Pull 消息。
配置文件resource/application.properties:rocketmq.name-server=localhost:9876 rocketmq.consumer.group=my-group1 rocketmq.consumer.topic=testPull Consumer代碼:while(!isStop) {List<String> messages = rocketMQTemplate.receive(String.class);System.out.println(messages); }豐富的消息類型
RocketMQ Spring 消息類型支持方面與 RocketMQ 原生客戶端完全對(duì)齊,包括同步/異步/one-way、順序、延遲、批量、事務(wù)以及 Request-Reply 消息。在這里,主要介紹較為特殊的事務(wù)消息和 request-reply 消息。
1. 事務(wù)消息
RocketMQ 的事務(wù)消息不同于 Spring Messaging 中的事務(wù)消息,依然采用 RocketMQ 原生事務(wù)消息的方案。如下所示,發(fā)送事務(wù)消息時(shí)需要實(shí)現(xiàn)一個(gè)包含 @RocketMQTransactionListener 注解的類,并實(shí)現(xiàn) executeLocalTransaction 和 checkLocalTransaction 方法,從而來完成執(zhí)行本地事務(wù)以及檢查本地事務(wù)執(zhí)行結(jié)果。
// Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..)...; // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the @RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);// Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// ... local transaction process, return bollback, commit or unknownreturn RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// ... check transaction status and return bollback, commit or unknownreturn RocketMQLocalTransactionState.COMMIT;} }在 2.1.0 版本中,RocketMQ-Spring 重構(gòu)了事務(wù)消息的實(shí)現(xiàn),如下圖所示,舊版本中每一個(gè) group 對(duì)應(yīng)一個(gè) TransactionProducer,而在新版本中改為每一個(gè) RocketMQTemplate 對(duì)應(yīng)一個(gè) TransationProducer,從而解決了并發(fā)使用多個(gè)事務(wù)消息的問題。當(dāng)用戶需要在單進(jìn)程使用多個(gè)事務(wù)消息時(shí),可以使用 ExtRocketMQTemplate 來完成(一般情況下,推薦一個(gè)進(jìn)程使用一個(gè) RocketMQTemplate,ExtRocketMQTemplate 可以使用在同進(jìn)程中需要使用多個(gè) Producer / LitePullConsumer 的場(chǎng)景,可以為 ExtRocketMQTemplate 指定與標(biāo)準(zhǔn)模版 RocketMQTemplate 不同的 nameserver、group 等配置),并在對(duì)應(yīng)的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 為 ExtRocketMQTemplate 的 BeanName。
2. Request-Reply 消息
在 2.1.0 版本中,RocketMQ-Spring 開始支持 Request-Reply 消息。Request-Reply 消息指的是上游服務(wù)投遞消息后進(jìn)入等待被通知的狀態(tài),直到消費(fèi)端返回結(jié)果并返回給發(fā)送端。在 RocketMQ-Spring 中,發(fā)送端通過 RocketMQTemplate 的 sendAndReceivce 方法進(jìn)行發(fā)送,如下所示,主要有同步和異步兩種方式。異步方式中通過實(shí)現(xiàn) RocketMQLocalRequestCallback 進(jìn)行回調(diào)。
// 同步發(fā)送request并且等待String類型的返回值 String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);// 異步發(fā)送request并且等待User類型的返回值 rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {@Override public void onSuccess(User message) {……}@Override public void onException(Throwable e) {……} });在消費(fèi)端,仍然需要實(shí)現(xiàn)一個(gè)包含 @RocketMQMessageListener 注解的類,但需要實(shí)現(xiàn)的接口是 RocketMQReplyListener<T, R> 接口(普通消息為 RocketMQListener 接口),其中 T 表示接收值的類型,R 表示返回值的類型,接口需要實(shí)現(xiàn)帶返回值的 onMessage 方法,返回值的內(nèi)容返回給對(duì)應(yīng)的 Producer。
@Service @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer") public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {@Overridepublic String onMessage(String message) {……return "reply string";} }RocketMQ-Spring 遵循 Spring 約定大于配置(Convention over configuration)的理念,通過啟動(dòng)器(Spring Boot Starter)的方式,在 pom 文件引入依賴(groupId:org.apache.rocketmq,artifactId:rocketmq-spring-boot-starter)便可以在 Spring Boot 中集成所有 RocketMQ 客戶端的所有功能,通過簡(jiǎn)單的注解使用即可完成消息的收發(fā)。在 RocketMQ-Spring Github Wiki?中有更加詳細(xì)的用法和常見問題解答。
據(jù)統(tǒng)計(jì),從 RocketMQ-Spring 發(fā)布第一個(gè)正式版本以來,RocketMQ-Spring 完成 16 個(gè) bug 修復(fù),37 個(gè) imporvement,其中包括事務(wù)消息重構(gòu),消息過濾、消息序列化、多實(shí)例 RocketMQTemplate 優(yōu)化等重要優(yōu)化,歡迎更多的小伙伴能參與到 RocketMQ 社區(qū)的建設(shè)中來,羅美琪(RocketMQ)和春波特(Spring Boot)的故事還在繼續(xù)…釘釘搜索群號(hào):21982288,即可進(jìn)群和眾多開發(fā)者交流!
總結(jié)
以上是生活随笔為你收集整理的RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 托管节点池助力用户构建稳定自愈的 Kub
- 下一篇: Spring Boot Admin 集成