javascript
引导性GCP:带有Google Cloud Pub / Sub的Spring Cloud Stream
我最近在Sprint Central的工程博客上閱讀了Josh Long的Bootiful GCP系列 ,特別喜歡關于使用Google Cloud的Pub / Sub的第四部分 。 我受到該系列的啟發,同時我還在為我的一個新項目評估Spring Cloud Stream。 我以為,我會繼續討論喬希(Josh)停下的那篇文章。 本文介紹了如何將Spring Cloud Stream與Google Cloud Pub / Sub一起使用,以實現簡單的生產者和使用者應用程序。
介紹
如果您之前閱讀過Josh的文章,則可以安全地跳過此部分。 如果您還沒有這樣做,請不用擔心,我將在此處快速總結一些關鍵點。
什么是Google Cloud Pub / Sub?
Google通過以下方式定義發布/訂閱 。
Cloud Pub / Sub將面向企業消息的中間件的可伸縮性,靈活性和可靠性帶到了云中。 通過提供將發送者和接收者分離的多對多異步消息傳遞,它可以在獨立編寫的應用程序之間進行安全且高度可用的通信。
https://cloud.google.com/pubsub/docs/overview簡而言之,Pub / Sub是Google的解決方案,用于支持開發人員將應用程序組件與Google規模的消息代理連接起來。 顧名思義,此解決方案使用您期望的相同概念來實現發布/訂閱機制。 可以將郵件提交到主題,并且某個主題的所有訂閱者都可以接收已發布的消息。
在這里需要強調的是,Pub / Sub為每個提交的消息至少提供一次傳遞。 如果要確保只發送一次消息,則必須自己照顧。
什么是Spring Integration?
Spring Integration是其投資組合中的一個Spring項目。 整篇文章甚至整本書都可以寫在上面,因為它本身就是一個巨大的框架。 總之,Spring Integration是一個框架,可以幫助您使用EIP模式設計和集成應用程序。 Spring Integration構建的兩個最基本的原語是Message<T>和MessageChannel 。 在這方面,開發人員可以使組件彼此分離和隔離。 您可以想到這種機制,就好像Spring Integration將以某種方式甚至不需要組件彼此了解而是通過交換消息來進一步依賴注入的想法一樣。
通道可以將組件彼此連接,如果它們位于相同的JVM中,或者即使它們是由網絡分布和分隔的。 此時,要了解的相關概念是什么是通道適配器。 它們基本上是用來將Spring Framework消息通過消息通道時轉換為一段可由外部系統使用的數據。
Spring Integration提供了許多適配器,可以幫助開發人員連接數據庫,消息代理和許多其他外部系統。 在這種情況下,將使用適配器向Google Cloud Pub / Sub提交消息或從Google Cloud Pub / Sub接收消息。 Spring Cloud GCP項目為Pub / Sub提供了入站和出站適配器,從Spring Integration消息流的角度來看,這使得消息交換變得透明。
如果您閱讀Josh的文章 ,他的工作是他正在介紹Spring Integration,以一種干凈,一致的方式使用Pub / Sub。 這意味著將刪除PubSubTemplate的直接引用,因此,如果您想將該文章中的示例修改為例如RabbitMQ,您要做的就是相應地替換通道適配器。
什么是Spring Cloud Stream?
消息傳遞非常適合微服務世界,在微服務世界中,一組分布式組件相互通信。 由于消息和渠道是Spring Integration中的頭等公民,因此非常適合。 另一方面,Spring Integration是專門為實現那些EIP模式而設計的。
但是,在現代應用程序開發中,我們不一定要與舊系統集成,在這種情況下,我們寧愿與RabbitMQ , Apache Kafka等現代消息代理集成,也要與GCP Pub / Sub集成。 就是說,就能夠與各種外部系統集成而言,我們不需要Spring Integration的全部功能。 這種額外的靈活性要求我們配置適配器,而這是我們不需要的。 如果我們僅使用GCP Pub / Sub或前面提到的任何其他現代消息代理,那么必須為每個單個組件定義和配置適配器就變得很麻煩。
我們確實希望能夠靈活地處理消息,并且希望利用消息代理,但是我們不想編寫Spring Integration所需的太多代碼。 Spring Cloud Stream建立在Spring Integration之上,并利用了相同的原語(例如消息和通道),但減輕了開發人員的負擔,不必將這些組件連接在一起。 因為渠道是通過特定于中間件的Binder實現連接到外部代理的。
將Spring Cloud Stream與Google Cloud Pub / Sub結合使用
我想我已經充分討論了Spring Cloud Stream,Spring Integration和Google Cloud Pub / Sub的背景。 現在該看一些代碼了。 有兩個非常簡單的Spring Boot應用程序,它們交換一個簡單的字符串作為消息的有效負載。 讓我們從發布者開始。
發行人
這基本上是一個簡單的控制器,它發送一個簡單的String作為消息的有效負載。 如果您以前使用過Spring Integration,則發送部分沒有什么特別的。
@RestController public class PublisherController {private final MessageChannel outgoing;public PublisherController(Channels channels) {outgoing = channels.outgoing();}@PostMapping("/publish/{name}")public void publish(@PathVariable String name) {outgoing.send(MessageBuilder.withPayload("Hello " + name + "!").build());}}有趣的是消息通道如何綁定到實際消息代理的資源。 在第6-8行中,注入了一個bean( Channels ),它似乎持有對傳出消息通道的引用。
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;public interface Channels {@OutputMessageChannel outgoing();}Channels反過來只是一個接口,可以定義任意數量的消息通道并用@Input或@Output標記。 Spring Cloud Stream負責實例化一個代理對象,該代理對象負責返回對MessageChannel對象的引用。
@EnableBinding(Channels.class) @SpringBootApplication public class PubsubPublisherApplication {public static void main(String[] args) {SpringApplication.run(PubsubPublisherApplication.class, args);}}Spring Cloud Stream依賴于Spring Boot和Spring Integration。 所述@EnableBinding注釋標記Channels作為一個可綁定接口和對一個邏輯綁定的域名( outgoing )與目的地。 目的地的含義因活頁夾的不同而不同,對于發布/訂閱,它意味著消息生產者的主題和消息消費者的訂閱。 這些綁定可以在application.yml定義。
spring:cloud:stream:bindings:outgoing:destination: reservations訂戶
訂閱者比發布者更簡單,它只是一個類。
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message;@Slf4j @EnableBinding(Sink.class) @SpringBootApplication public class PubsubSubscriberApplication {public static void main(String[] args) {SpringApplication.run(PubsubSubscriberApplication.class, args);}@StreamListener(Sink.INPUT)public void handleMessage(Message<String> message) {log.info("Received: {}.", message.getPayload());}}這里值得一提的是水槽是什么? 正如我們已經看到的, @EnableBinding可以采用接口,然后該框架隱藏了將入站和出站消息適配器連接到消息通道的復雜性,并且還配置了相關的基礎結構。 大多數應用程序僅向單個通道發送消息或從單個通道接收消息。 這就是Spring Cloud Stream提供Source , Sink和Processor接口以幫助您減少代碼的原因。 就是說,我們也可以為發布者使用Source而不是定義Channels ,但是我想展示框架的功能。
運行演示
為了能夠運行示例,您需要完成以下步驟。
如果已經有一個,則可以跳過此步驟。
如果您不需要安裝任何軟件,我認為會更容易。 默認情況下, Google Cloud Shell隨附了Google Cloud SDK ,Git,Maven和Java。
啟用發布/訂閱API
由于Spring Cloud Stream是一個自以為是的框架,因此在其之上構建的應用程序將自行創建主題和訂閱。 也就是說,在此處手動創建主題和訂閱是可選的。 不過,您必須啟用發布/訂閱API。
% gcloud services enable pubsub.googleapis.com % gcloud pubsub topics create reservations % gcloud pubsub subscriptions create reservations --topic=reservations克隆
% git clone https://github.com/springuni/springuni-examples.git啟動發布者
% cd ~/springuni-examples/spring-cloud/spring-cloud-stream-pubsub-publisher % mvn spring-boot:run啟動訂戶
Google Cloud Shell帶有tmux支持,這也意味著它默認情況下會啟動tmux會話。 當然可以禁用。 重要的一點是,您不必打開新的外殼,只需單擊Ctrl-B和C即可打開一個新窗口。有關更多詳細信息,請參閱Tmux鍵綁定 。
% cd ~/springuni-examples/spring-cloud/spring-cloud-stream-pubsub-subscriber % mvn spring-boot:run發送信息
像以前一樣再次打開一個新窗口并發送消息。
% curl -XPOST http://localhost:8080/publish/test您應該看到訂閱者收到它。
問題
- 您認為如果啟動更多訂閱者會發生什么?
- 他們都會收到同一條消息還是只收到其中一條?
- 那為什么呢?
在下面發表評論,讓我知道您的想法!
結論
我們已經了解了什么是Google Cloud Pub / Sub,什么是Spring Integration,以及為何Spring Cloud Stream建立在Spring Integration上以幫助開發人員更快地創建消息驅動的微服務的原因。 在上面的代碼示例中,我進一步介紹了Josh的示例,并使用Spring Cloud Stream代替了Spring Integration,最終減少了更多代碼。
翻譯自: https://www.javacodegeeks.com/2018/12/bootiful-spring-cloud-stream.html
總結
以上是生活随笔為你收集整理的引导性GCP:带有Google Cloud Pub / Sub的Spring Cloud Stream的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓原生启动器下载(安卓原生启动器)
- 下一篇: spring roo_使用Spring