日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Kafka的Spring Cloud Stream

發布時間:2023/12/3 javascript 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka的Spring Cloud Stream 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

總覽

該示例項目演示了如何使用事件驅動的體系結構 , Spring Boot ,Spring Cloud Stream, Apache Kafka和Lombok構建實時流應用程序。

在本教程結束時,您將運行一個簡單的基于Spring Boot的Greetings微服務

  • 從REST API獲取消息
  • 將其寫入Kafka主題
  • 從主題中讀取
  • 將其輸出到控制臺
  • 讓我們開始吧!

    順便說一句,您可以在此處找到源代碼。

    什么是Spring Cloud Streaming?

    Spring Cloud Stream是基于Spring Boot構建的框架,用于構建消息驅動的微服務。

    什么是卡夫卡?

    Kafka是最初由LinkedIn開發的流行的高性能和水平可伸縮的消息傳遞平臺。

    安裝Kafka

    從這里下載Kafka并將其解壓縮:

    >?tar -xzf kafka_2.11-1.0.0.tgz > cd kafka_2.11-1.0.0

    啟動Zookeeper和Kafka

    在Windows上:

    >?bin\windows\zookeeper-server-start.bat config\zookeeper.properties > bin\windows\kafka-server-start.bat config\server.properties

    在Linux或Mac上:

    >?bin/zookeeper-server-start.sh config/zookeeper.properties > bin/kafka-server-start.sh config/server.properties

    如果計算機從休眠狀態喚醒后,Kafka沒有運行并且無法啟動,請刪除<TMP_DIR>/kafka-logs文件夾,然后再次啟動Kafka。

    什么是Lombok?

    Lombok是一個Java框架,可在代碼中自動生成getter,setter,toString(),構建器,記錄器等。

    Maven依賴

    轉到https://start.spring.io創建一個Maven項目:

  • 添加必要的依賴項: Spring Cloud Stream , Kafka , Devtools (用于在開發過程中進行熱重新部署,可選), Actuator (用于監視應用程序,可選), Lombok (確保在IDE中也安裝了Lombok插件)
  • 單擊生成項目按鈕以zip文件形式下載項目
  • 解壓縮zip文件并將maven項目導入到您喜歡的IDE
  • 注意pom.xml文件中的Maven依賴項:

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><!-- Also install the Lombok plugin in your IDE --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency>

    …還有<dependencyManagement>部分:

    <dependencyManagement><dependencies><dependency><!-- Import dependency management from Spring Boot --><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-dependencies</artifactId><version>${spring-cloud-stream.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>

    …和<repository>部分:

    <repository><id>spring-milestones</id><name>Spring Milestones</name><url>http://repo.spring.io/libs-milestone</url><snapshots><enabled>false</enabled></snapshots> </repository>

    定義卡夫卡流

    package com.kaviddiss.streamkafka.stream;import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel;? public interface GreetingsStreams {String INPUT = "greetings-in";String OUTPUT = "greetings-out";@Input(INPUT)SubscribableChannel inboundGreetings();@Output(OUTPUT)MessageChannel outboundGreetings(); }

    為了使我們的應用程序能夠與Kafka進行通信,我們需要定義一個出站流以將消息寫入Kafka主題,并定義一個入站流以讀取來自Kafka主題的消息。

    通過簡單地創建一個接口為每個流定義單獨的方法,Spring Cloud提供了一種方便的方法。

    inboundGreetings()方法定義要從Kafka讀取的入站流,而outboundGreetings()方法定義要寫入Kafka的出站流。

    在運行時,Spring將為GreetingsStreams接口創建一個基于Java代理的實現,該實現可以作為Spring Bean注入到代碼中的任何位置,以訪問我們的兩個流。

    配置Spring Cloud Stream

    下一步是將Spring Cloud Stream配置為綁定到GreetingsStreams接口中的流。 這可以通過使用以下代碼創建@Configuration類com.kaviddiss.streamkafka.config.StreamsConfig來完成:

    package com.kaviddiss.streamkafka.config;import com.kaviddiss.streamkafka.stream.GreetingsStreams; import org.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding(GreetingsStreams.class) public class StreamsConfig { }

    使用@EnableBinding批注(將GreatingsService接口傳遞到該批注)完成@EnableBinding的GreatingsService 。

    Kafka的配置屬性

    默認情況下,配置屬性存儲在src/main/resources/application.properties文件中。

    但是,我更喜歡使用YAML格式,因為它不太冗長,并且允許將公共屬性和特定于環境的屬性保留在同一文件中。

    現在,讓我們將application.properties重命名為application.yaml并將config片段下方粘貼到文件中:

    spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:greetings-in:destination: greetingscontentType: application/jsongreetings-out:destination: greetingscontentType: application/json

    上面的配置屬性配置要連接的Kafka服務器的地址,以及我們用于代碼中的入站和出站流的Kafka主題。 他們倆都必須使用相同的Kafka主題!

    contentType屬性告訴Spring Cloud Stream在流中以String的形式發送/接收我們的消息對象。

    創建消息對象

    使用下面的代碼創建一個簡單的com.kaviddiss.streamkafka.model.Greetings類,該代碼將表示我們從中讀取并寫入的greetings Kafka主題:

    package com.kaviddiss.streamkafka.model;// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/): import lombok.Builder; import lombok.Getter; import lombok.Setter; import lombok.ToString;@Getter @Setter @ToString @Builder public class Greetings {private long timestamp;private String message; }

    注意,由于Lombok批注,該類如何沒有任何getter和setter。 @ToString將使用類的字段生成toString()方法,而@Builder批注將允許我們使用流暢的生成器創建Greetings對象(請參見下文)。

    創建服務層以寫入Kafka

    讓我們創建的com.kaviddiss.streamkafka.service.GreetingsService下面的代碼,將寫一個類Greetings對象的greetings卡夫卡話題:

    package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings; import com.kaviddiss.streamkafka.stream.GreetingsStreams; import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.util.MimeTypeUtils;@Service @Slf4j public class GreetingsService {private final GreetingsStreams greetingsStreams;public GreetingsService(GreetingsStreams greetingsStreams) {this.greetingsStreams = greetingsStreams;}public void sendGreeting(final Greetings greetings) {log.info("Sending greetings {}", greetings);MessageChannel messageChannel = greetingsStreams.outboundGreetings();messageChannel.send(MessageBuilder.withPayload(greetings).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());}

    @Service批注會將此類配置為Spring Bean,并通過構造函數注入GreetingsService依賴項。

    @Slf4j批注將生成一個SLF4J記錄器字段,可用于記錄日志。

    在sendGreeting()方法中,我們使用注入的GreetingsStream對象發送由Greetings對象表示的消息。

    創建REST API

    現在,我們將創建一個REST api端點,該端點將觸發使用GreetingsService Spring Bean向Kafka發送消息:

    package com.kaviddiss.streamkafka.web;import com.kaviddiss.streamkafka.model.Greetings; import com.kaviddiss.streamkafka.service.GreetingsService; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController;?@RestController public class GreetingsController {private final GreetingsService greetingsService;public GreetingsController(GreetingsService greetingsService) {this.greetingsService = greetingsService;}@GetMapping("/greetings")@ResponseStatus(HttpStatus.ACCEPTED)public void greetings(@RequestParam("message") String message) {Greetings greetings = Greetings.builder().message(message).timestamp(System.currentTimeMillis()).build();greetingsService.sendGreeting(greetings);} }

    @RestController注釋告訴Spring這是一個Controller bean(MVC中的C)。 greetings()方法定義一個HTTP GET /greetings端點,該端點接受message請求參數,并將其傳遞給GreetingsService的sendGreeting()方法。

    聽問候卡夫卡主題

    讓我們創建一個com.kaviddiss.streamkafka.service.GreetingsListener類,該類將偵聽greetings Kafka主題上的消息并將其記錄在控制臺上:

    package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings; import com.kaviddiss.streamkafka.stream.GreetingsStreams; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;@Component @Slf4j public class GreetingsListener {@StreamListener(GreetingsStreams.INPUT)public void handleGreetings(@Payload Greetings greetings) {log.info("Received greetings: {}", greetings);} }

    @Component批注類似于@Service @Component , @Service @RestController定義了一個Spring Bean。

    GreetingsListener有一個方法, handleGreetings()將通過云春流與每一個新的調用Greetings的消息對象greetings卡夫卡的話題。 這要感謝為handleGreetings()方法配置的@StreamListener批注。

    運行應用程序

    最后一個難題是由Spring Initializer自動生成的com.kaviddiss.streamkafka.StreamKafkaApplication類:

    package com.kaviddiss.streamkafka;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class StreamKafkaApplication {public static void main(String[] args) {SpringApplication.run(StreamKafkaApplication.class, args);} }

    無需在此處進行任何更改。 您可以在您的IDE中將此類作為Java應用程序運行,也可以使用Spring Boot maven插件從命令行運行該應用程序:

    >?mvn spring-boot:run

    應用程序運行后,在瀏覽器中轉到http:// localhost:8080 / greetings?message = hello并檢查您的控制臺。

    摘要

    我希望您喜歡本教程。 隨時提出任何問題并留下您的反饋。

    翻譯自: https://www.javacodegeeks.com/2018/03/spring-cloud-stream-kafka.html

    總結

    以上是生活随笔為你收集整理的Kafka的Spring Cloud Stream的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。