日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud Stream 体系及原理介绍

發布時間:2025/4/5 javascript 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud Stream 体系及原理介绍 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

https://mp.weixin.qq.com/s/e_pDTFmFcSqHH-uSIzNmMg

Spring Cloud Stream?在 Spring Cloud 體系內用于構建高度可擴展的基于事件驅動的微服務,其目的是為了簡化消息在 Spring Cloud 應用程序中的開發。

?

Spring Cloud Stream?(后面以 SCS 代替 Spring Cloud Stream)?本身內容很多,而且它還有很多外部的依賴,想要熟悉 SCS,必須要先了解 Spring Messaging 和 Spring Integration 這兩個項目,接下來,文章將從圍繞以下三點進行展開:

?

  • 什么是 Spring Messaging;

  • 什么是 Spring Integration;

  • 什么是 SCS 體系及其原理;

?

Spring Messaging


Spring Messaging 是 Spring Framework 中的一個模塊,其作用就是統一消息的編程模型。

  • 比如消息 Messaging 對應的模型就包括一個消息體 Payload 和消息頭 Header:

?

package?org.springframework.messaging;
public?interface?Message<T>?{
? ? T?getPayload();
? ? MessageHeaders?getHeaders();
}

  • 消息通道 MessageChannel 用于接收消息,調用 send 方法可以將消息發送至該消息通道中 :

?

@FunctionalInterface
public?interface?MessageChannel?{
? ? long?INDEFINITE_TIMEOUT?=?-1;
? ? default?boolean?send(Message<?>?message) {

? ? ? ? ?return?send(message,?INDEFINITE_TIMEOUT);

? ? ?}
? ? ?boolean?send(Message<?>?message,?long?timeout);
}

消息通道里的消息如何被消費呢?
  • 由消息通道的子接口可訂閱的消息通道 SubscribableChannel 實現,被 MessageHandler 消息處理器所訂閱:

public?interface?SubscribableChannel?extends?MessageChannel?{
? ? boolean?subscribe(MessageHandler?handler);
? ? boolean?unsubscribe(MessageHandler?handler);
}

  • 由MessageHandler 真正地消費/處理消息:

@FunctionalInterface
public?interface?MessageHandler?{
? ? void?handleMessage(Message<?>?message)?throws?MessagingException;
}

Spring Messaging 內部在消息模型的基礎上衍生出了其它的一些功能,如:

1. 消息接收參數及返回值處理:消息接收參數處理器 HandlerMethodArgumentResolver 配合 @Header, @Payload 等注解使用;消息接收后的返回值處理器 HandlerMethodReturnValueHandler 配合 @SendTo 注解使用;

2. 消息體內容轉換器 MessageConverter;

3. 統一抽象的消息發送模板 AbstractMessageSendingTemplate;

4. 消息通道攔截器 ChannelInterceptor;

?

Spring Integration


Spring Integration 提供了 Spring 編程模型的擴展用來支持企業集成模式(Enterprise Integration Patterns),是對 Spring Messaging 的擴展。

它提出了不少新的概念,包括消息路由 MessageRoute、消息分發 MessageDispatcher、消息過濾 Filter、消息轉換 Transformer、消息聚合 Aggregator、消息分割 Splitter 等等。同時還提供了 MessageChannel 和MessageHandler 的實現,分別包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel 和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等內容。

這里為大家介紹幾種消息的處理方式:
  • 消息的分割:

?

  • 消息的聚合:

?

?

  • 消息的過濾:

?

  • 消息的分發:

?

?

接下來,我們以一個最簡單的例子來嘗試一下 Spring Integration:

這段代碼解釋為:

?

SubscribableChannel messageChannel =new DirectChannel(); // 1

messageChannel.subscribe(msg-> { // 2
?System.out.println("receive: " +msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); // 3

?

1. 構造一個可訂閱的消息通道 messageChannel;

2. 使用 MessageHandler 去消費這個消息通道里的消息;

3. 發送一條消息到這個消息通道,消息最終被消息通道里的 MessageHandler 所消費。

最后控制臺打印出: receive: msg from alibaba;

DirectChannel 內部有個 UnicastingDispatcher 類型的消息分發器,會分發到對應的消息通道 MessageChannel 中,從名字也可以看出來,UnicastingDispatcher 是個單播的分發器,只能選擇一個消息通道。那么如何選擇呢? 內部提供了 LoadBalancingStrategy 負載均衡策略,默認只有輪詢的實現,可以進行擴展。

我們對上段代碼做一點修改,使用多個 MessageHandler 去處理消息:

SubscribableChannel?messageChannel?=?new?DirectChannel();

messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive1: "?+?msg.getPayload());
});

messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive2: "?+?msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

由于 DirectChannel 內部的消息分發器是 UnicastingDispatcher 單播的方式,并且采用輪詢的負載均衡策略,所以這里兩次的消費分別對應這兩個 MessageHandler。控制臺打印出:

receive1: msg from alibaba
receive2: msg from alibaba

既然存在單播的消息分發器 UnicastingDispatcher,必然也會存在廣播的消息分發器,那就是 BroadcastingDispatcher,它被 PublishSubscribeChannel 這個消息通道所使用。廣播消息分發器會把消息分發給所有的 MessageHandler:

SubscribableChannel?messageChannel?=?new?PublishSubscribeChannel();

messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive1: "?+?msg.getPayload());
});

messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive2: "?+?msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

發送兩個消息,都被所有的 MessageHandler 所消費。控制臺打印:

receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba

?

Spring Cloud Stream


SCS與各模塊之間的關系是:

  • SCS 在 Spring Integration 的基礎上進行了封裝,提出了 Binder, Binding, @EnableBinding, @StreamListener 等概念;

  • SCS 與 Spring Boot Actuator 整合,提供了 /bindings, /channels endpoint;

  • SCS 與 Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties 等外部化配置類;

  • SCS 增強了消息發送失敗的和消費失敗情況下的處理邏輯等功能。

  • SCS 是 Spring Integration 的加強,同時與 Spring Boot 體系進行了融合,也是 Spring Cloud Bus 的基礎。它屏蔽了底層消息中間件的實現細節,希望以統一的一套 API 來進行消息的發送/消費,底層消息中間件的實現細節由各消息中間件的 Binder 完成。

Binder 是提供與外部消息中間件集成的組件,為構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用于構造生產者和消費者。目前官方的實現有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 內部已經實現了 RocketMQ Binder。

?

從圖中可以看出,Binding 是連接應用程序跟消息中間件的橋梁,用于消息的消費和生產。我們來看一個最簡單的使用 RocketMQ Binder 的例子,然后分析一下它的底層處理原理:

  • 啟動類及消息的發送:

@SpringBootApplication
@EnableBinding({?Source.class,?Sink.class?})?// 1
public?class?SendAndReceiveApplication?{
?
? ? public?static?void?main(String[]?args) {
? ? ? ? SpringApplication.run(SendAndReceiveApplication.class,?args);
? ? }
?
? ? ? ?@Bean?// 2
? ? public?CustomRunner?customRunner() {
? ? ? ? return?new?CustomRunner();
? ? }

? ? public?static?class?CustomRunner?implements?CommandLineRunner?{

? ? ? ? @Autowired
? ? ? ? private?Source?source;

? ? ? ? @Override
? ? ? ? public?void?run(String...?args)?throws?Exception?{
? ? ? ? ? ? int?count?=?5;
? ? ? ? ? ? for?(int?index?=?1;?index?<=?count;?index++) {
? ? ? ? ? ? ? ? source.output().send(MessageBuilder.withPayload("msg-"?+?index).build());?// 3
? ? ? ? ? ? }
? ? ? ? }
? ? }
}

  • 消息的接收:

@Service
public?class?StreamListenerReceiveService?{

? ? @StreamListener(Sink.INPUT)?// 4
? ? public?void?receiveByStreamListener1(String?receiveMsg) {
? ? ? ? System.out.println("receiveByStreamListener: "?+?receiveMsg);
? ? }

}

這段代碼很簡單,沒有涉及到 RocketMQ 相關的代碼,消息的發送和接收都是基于 SCS 體系完成的。如果想切換成 RabbitMQ 或 Kafka,只需修改配置文件即可,代碼無需修改。

我們來分析下這段代碼的原理:

?

1.?@EnableBinding?對應的兩個接口屬性?Source?和?Sink?是 SCS 內部提供的。SCS 內部會基于?Source?和?Sink?構造?BindableProxyFactory,且對應的 output 和 input 方法返回的 MessageChannel 是?DirectChannel。output 和 input 方法修飾的注解對應的 value 是配置文件中 binding 的 name。

public?interface?Source?{
? ? String?OUTPUT?=?"output";
? ? @Output(Source.OUTPUT)
? ? MessageChannel?output();
}
public?interface?Sink?{
? ? String?INPUT?=?"input";
? ? @Input(Sink.INPUT)
? ? SubscribableChannel?input();
}

配置文件里 bindings 的 name 為 output 和 input,對應 Source 和 Sink 接口的方法上的注解里的 value:

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group

spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1

2. 構造 CommandLineRunner,程序啟動的時候會執行 CustomRunner 的 run 方法。

3. 調用 Source 接口里的 output 方法獲取 DirectChannel,并發送消息到這個消息通道中。這里跟之前 Spring Integration 章節里的代碼一致。

  • Source 里的 output 發送消息到 DirectChannel 消息通道之后會被 AbstractMessageChannelBinder#SendingHandler 這個 MessageHandler 處理,然后它會委托給 AbstractMessageChannelBinder#createProducerMessageHandler 創建的 MessageHandler 處理(該方法由不同的消息中間件實現);

  • 不同的消息中間件對應的 AbstractMessageChannelBinder#createProducerMessageHandler 方法返回的 MessageHandler 內部會把 Spring Message 轉換成對應中間件的 Message 模型并發送到對應中間件的 broker;

4. 使用 @StreamListener 進行消息的訂閱。請注意,注解里的 Sink.input 對應的值是 "input",會根據配置文件里 binding 對應的 name 為 input 的值進行配置:

  • 不同的消息中間件對應的 AbstractMessageChannelBinder#createConsumerEndpoint 方法會使用 Consumer 訂閱消息,訂閱到消息后內部會把中間件對應的 Message 模型轉換成 Spring Message;

  • 消息轉換之后會把 Spring Message 發送至 name 為 input 的消息通道中;

  • @StreamListener 對應的 StreamListenerMessageHandler 訂閱了 name 為 input 的消息通道,進行了消息的消費;

這個過程文字描述有點啰嗦,用一張圖總結一下(黃色部分涉及到各消息中間件的 Binder 實現以及 MQ 基本的訂閱發布功能):

?

SCS 章節的最后,我們來看一段 SCS 關于消息的處理方式的一段代碼:

@StreamListener(value?=?Sink.INPUT,?condition?=?"headers['index']=='1'")
public?void?receiveByHeader(Message?msg) {
? ? ?System.out.println("receive by headers['index']=='1': "?+?msg);
}

@StreamListener(value?=?Sink.INPUT,?condition?=?"headers['index']=='9999'")
public?void?receivePerson(@Payload?Person?person) {
? ? ?System.out.println("receive Person: "?+?person);
}

@StreamListener(value?=?Sink.INPUT)
public?void?receiveAllMsg(String?msg) {
? ? ?System.out.println("receive allMsg by StreamListener. content: "?+?msg);
}

@StreamListener(value?=?Sink.INPUT)
public?void?receiveHeaderAndMsg(@Header("index")?String?index,?Message?msg) {
? ? ?System.out.println("receive by HeaderAndMsg by StreamListener. content: "?+?msg);
}

有沒有發現這段代碼跟 Spring MVC Controller 中接收請求的代碼很像? 實際上他們的架構都是類似的,Spring MVC 對于 Controller 中參數和返回值的處理類分別是org.springframework.web.method.support.HandlerMethodArgumentResolver、 org.springframework.web.method.support.HandlerMethodReturnValueHandler。

Spring Messaging 中對于參數和返回值的處理類之前也提到過,分別是 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver、org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler。

它們的類名一模一樣,甚至內部的方法名也一樣。

?

總結


?

上圖是 SCS 體系相關類說明的總結,關于 SCS 以及 RocketMQ Binder 更多相關的示例,可以參考 RocketMQ Binder Demos(Demos 地址:點擊“閱讀原文”),包含了消息的聚合、分割、過濾;消息異常處理;消息標簽、SQL過濾;同步、異步消費等等。

下一篇文章,我們將分析消息總線(Spring Cloud Bus) 在 Spring Cloud 體系中的作用,并逐步展開,分析 Spring Cloud Alibaba 中的 RocketMQ Binder 是如何實現 Spring Cloud Stream 標準的。

轉載于:https://www.cnblogs.com/davidwang456/articles/10653269.html

總結

以上是生活随笔為你收集整理的Spring Cloud Stream 体系及原理介绍的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 成人毛片观看 | 亚洲黄片一区二区三区 | 色哟哟一区二区三区 | 中文字幕+乱码+中文 | 天天综合欧美 | 精品国产日本 | 日韩久久影视 | 男人的天堂色 | 污污动态图 | 中国成熟妇女毛茸茸 | 亚洲国产成人精品久久久 | 日韩黄色短视频 | 3d动漫精品啪啪一区二区三区免费 | 国产黄色av片 | 欧美xxxx喷水 | 国产日韩欧美精品在线 | 狠操av| 日xxxx| 18禁一区二区 | 伊人222成人综合网 亚洲日本中文 | 欧美片 | 美女扒开腿让男人操 | 学生调教贱奴丨vk | 午夜九九九| av黄色在线观看 | 精品久久久久久无码人妻 | 网站一区二区 | 亚洲30p | 波多野结衣视频免费观看 | 色哟哟国产精品色哟哟 | 成人免费在线观看网站 | 日本少妇做爰全过程毛片 | 男阳茎进女阳道视频大全 | 午夜影视福利 | 成人激情电影在线观看 | 成人国产片 | 欧美日韩一级二级三级 | 欧美一二三视频 | 日本在线一级片 | 亚洲免费色 | 伊人精品在线观看 | 99热影院| 欧美日韩电影一区 | 国产精品国产三级国产a | a∨鲁丝一区鲁丝二区鲁丝三区 | 看a网站| 亚洲一区二区三区四区在线 | 中文字幕高清在线免费播放 | 插插射射 | 亚洲欧美在线免费 | 成人福利一区 | 香蕉视频网站 | 日本一级淫片色费放 | 亚洲精品午夜国产va久久成人 | av青青草原 | 中文字幕一区二区三区四区视频 | 黄色av网址在线 | 四虎影院在线观看免费 | 美女日批视频在线观看 | 北条麻妃一区二区三区四区五区 | 久草www | 国产女人叫床高潮大片免费 | 国产精品videossex国产高清 | 成人在线一区二区三区 | 9色视频在线观看 | 国产女女做受ⅹxx高潮 | 久久成人精品 | 免费在线播放视频 | 天天曰天天操 | 河北彩花69xx精品一区 | 欧美国产在线观看 | 美女被男人插 | 成人在线观看黄色 | 成人特级毛片 | 精品国产亚洲av麻豆 | 午夜一区二区三区免费观看 | 精品视频在线观看免费 | 福利毛片| 日韩一级黄色 | 精品成人一区二区三区久久精品 | 一区二区三区免费看 | 最新国产拍偷乱偷精品 | 日本少妇久久 | 丁香六月激情 | 伊人久久精品视频 | 国产chinese男男gaygay视频 | 亚洲国内精品 | 国产成人欧美一区二区三区91 | 免费成年人视频在线观看 | 国产在线看一区 | 国产高清免费视频 | 91传媒在线视频 | 在线一级片 | 亚洲天堂精品在线 | 国产专区精品 | av五十路 | 黑料网在线观看 | 久久免费视频一区二区 | 69精品视频 |