日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 前端技术 > javascript >内容正文

javascript

SpringCloud Stream消息驱动

發(fā)布時(shí)間:2023/12/10 javascript 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpringCloud Stream消息驱动 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

為啥有這個(gè)技術(shù)???

1. 這個(gè)stream是操作消息隊(duì)列的,簡(jiǎn)化,學(xué)習(xí)消息隊(duì)列的成本降低。 2. 可操作rabbitMQ兔子message queue,kafaka,可理解為jdbc可操作oracle, mysql.. 3. spring家的技術(shù)學(xué)就完了。。

stream

    • 對(duì)消息驅(qū)動(dòng)需要了解的概念
    • 消息驅(qū)動(dòng)生產(chǎn)者,消費(fèi)者
    • 再建一個(gè)服務(wù),消息消費(fèi)者,問(wèn)題

對(duì)消息驅(qū)動(dòng)需要了解的概念

(1) 網(wǎng)站 文檔
https://spring.io/projects/spring-cloud-stream#overview

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/


(2) 介紹

什么是SpringCloudStream 官方定義Spring Cloud Stream是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架。 應(yīng)用程序通過(guò)inputs或者outputs與Spring Cloud Stream中binder對(duì)象交互。 通過(guò)我們配置來(lái)binding(綁定), 而Spring Cloud Stream的binder對(duì)象負(fù)責(zé)與消息中間件交互。 所以,我們只需要搞清楚如何與Spring Cloud Stream 交互就可以訪便使用消息驅(qū)動(dòng)的方式。 通過(guò)使用Spring Integration來(lái)連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)。 Spring Cloud Stream為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn), 引用了發(fā)布-訂閱、消費(fèi)組、分區(qū)的三個(gè)核心概念。 目前僅支持RabbitMQ、Kafka.

(3) 標(biāo)準(zhǔn)mq

  • Message

    • 生產(chǎn)者/消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容
  • 消息通道MessageChannel

    • 消息必須走特定的通道
  • 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器訂閱

    • 消息通道里的消息如何被消費(fèi)呢,誰(shuí)負(fù)責(zé)收發(fā)處理

(4) 為什么用Cloud Stream

1 綁定stream憑什么可以統(tǒng)一底層差異

  • 在沒(méi)有綁定器這個(gè)概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,
    于各消息中間件構(gòu)建的初衷不同,它們的實(shí)現(xiàn)細(xì)節(jié)上會(huì)有較大的差異性
    通過(guò)定義綁定器作為中間層,完美地實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。
    通過(guò)向應(yīng)用程序暴露統(tǒng)- -的Channel通道, 使得應(yīng)用程序不需要再考慮各種不同的消息中間件實(shí)現(xiàn)。

2 binder架構(gòu)

INPUT對(duì)應(yīng)于消費(fèi)者 OUTPUT對(duì)應(yīng)于生產(chǎn)者

(5) Stream中的消息通信方式遵循了發(fā)布-訂閱模式

Topic主題進(jìn)行廣播 在RabbitMQ就是Exchange 交換機(jī) 在kafka中就是Topic

(6) 常用api,注解

消息驅(qū)動(dòng)生產(chǎn)者,消費(fèi)者

  • 生產(chǎn)者
    pom
  • <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
    ymal server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;defaultRabbit: # 表示定義的名稱,用于于binding整合type: rabbit # 消息組件類型environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服務(wù)的整合處理input: # 這個(gè)名字是一個(gè)通道的名稱destination: studyExchange # 表示要使用的Exchange名稱定義content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置eureka:client: # 客戶端進(jìn)行Eureka注冊(cè)的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 設(shè)置心跳的時(shí)間間隔(默認(rèn)是30秒)lease-expiration-duration-in-seconds: 5 # 如果現(xiàn)在超過(guò)了5秒的間隔(默認(rèn)是90秒)instance-id: receive-8802.com # 在信息列表時(shí)顯示主機(jī)名稱prefer-ip-address: true # 訪問(wèn)的路徑變?yōu)镮P地址

    發(fā)送消息接口,實(shí)現(xiàn)

    public interface IMessageProvider {public String send(); }/// 實(shí)現(xiàn) import com.atguigu.springcloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.messaging.MessageChannel; import org.springframework.integration.support.MessageBuilder; import javax.annotation.Resource; import org.springframework.cloud.stream.messaging.Source;import javax.annotation.Resource; import java.util.UUID;@EnableBinding(Source.class) //定義消息的推送管道 public class MessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output; // 消息發(fā)送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("*****serial: "+serial);return null;} }
  • 消費(fèi)者
    yaml, pom同時(shí),yaml要改端口。
    定義controller接收消息
  • import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component;@Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message) {System.out.println("消費(fèi)者1號(hào),接受:"+message.getPayload()+"\t port:"+serverPort);}}

    再建一個(gè)服務(wù),消息消費(fèi)者,問(wèn)題

    1. 消息重復(fù)。發(fā)送者發(fā)送消息,兩個(gè)消費(fèi)者都會(huì)接收到消息,如果是支付多個(gè)模塊, 收到一條消息,多個(gè)模塊會(huì)收到壞賬,需要分組,只對(duì)一個(gè)支付模塊發(fā)消息. 2. 消息持久化。當(dāng)關(guān)掉消費(fèi)者,消息丟失。

    a. 新增group配置,自定義group

    group: damn

    b. 持久化,服務(wù)掛了,保證消息不丟失

    頁(yè)數(shù)配置分組解決

    創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

    總結(jié)

    以上是生活随笔為你收集整理的SpringCloud Stream消息驱动的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。