日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java的Kafka:构建安全,可扩展的消息传递应用程序

發布時間:2023/12/3 java 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java的Kafka:构建安全,可扩展的消息传递应用程序 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

使用Okta的身份管理平臺輕松部署您的應用程序 使用Okta的API在幾分鐘之內即可對任何應用程序中的用戶進行身份驗證,管理和保護。 今天嘗試Okta。

當今的用戶希望可以通過其計算機,手機,平板電腦或任何其他設備訪問您的應用程序! 這種向軟件即服務(SaaS)規范的過渡要求開發人員有效地與強大的工具進行集成,這些工具可以擴展為每秒處理數千(甚至數百萬)個請求。 Apache Kafka是處理那些高吞吐量環境的最有效工具之一。

在本教程中,您將學習Apache Kafka的基本概念,并構建一個功能齊全的Java應用程序,該應用程序能夠產生和使用來自Kafka的消息。

先決條件: Java 8+,互聯網連接和免費的Okta開發人員帳戶 。

Apache Kafka的簡要概述

Apache Kafka是一個分布式流媒體平臺,它利用發布/訂閱消息模式與應用程序進行交互,并且旨在創建持久消息。

讓我們更詳細地分解這些概念。

分布式流媒體平臺

當您要運行Kafka時,需要啟動其代理:與其他任何服務器一樣,在計算機上運行的Kafka的簡單實例。 代理負責將消息發送,接收和存儲到磁盤中。

一個經紀人不足以確保Kafka可以處理高吞吐量的消息。 該目標是通過許多經紀人同時合作,相互溝通和協調來實現的。

Kafka集群將一個或多個經紀人組合在一起。 您的應用程序連接到一個群集,該群集為您管理所有分布式詳細信息,而不是連接到單個節點。

具有持久消息的發布/訂閱消息系統

發布/訂閱是分布式系統中的常見模式。 下圖說明了Kafka中此模式的基本結構:

該圖像包括到目前為止尚未提及的兩個組件:生產者和消費者。

生產者是將消息發送到群集的應用程序。 在此示例中,生產者1、2和3正在發送消息。 然后,集群選擇應由哪個代理存儲它們,并將其發送給選定的代理。

另一方面,您有消費者。 使用者是連接到群集并接收來自生產者的消息的應用程序。 任何有興趣使用生產者發送的消息的應用程序都必須連接到Kafka消費者。

由于Kafka會長時間存儲消息(默認值為7天),因此即使發送消息時不在場,您也可以讓許多使用者收到相同的消息!

卡夫卡主題

將消息發送到Kafka代理時,需要通過指定主題來指定將消息發送到的位置。 主題是消費者可以訂閱的消息類別。 該機制確保使用者僅接收與其相關的消息,而不是接收發布到集群的每條消息。

現在您已經了解了Kafka的基本體系結構,讓我們下載并安裝它。

安裝并運行Kafka

要下載Kafka, 請訪問Kafka網站 。 將此壓縮文件的內容提取到您喜歡的文件夾中。

在Kafka目錄中,轉到bin文件夾。 在這里,您會發現許多bash腳本,這些腳本對于運行Kafka應用程序很有用。 如果您使用的是Windows,則windows文件夾中也有相同的腳本。 本教程使用Linux命令,但是如果您正在運行Microsoft OS,則只需使用等效的Windows版本。

啟動Zookeeper管理您的Kafka群集

Apache Kafka始終作為分布式應用程序運行。 這意味著您的集群必須在同步配置或選舉負責人的過程中處理一些分布式挑戰。

Kafka使用Zookeeper跟蹤這些細節。 不過,不必擔心下載它。 Kafka已經與Zookeeper一起提供,可以讓您快速啟動并運行。

讓我們啟動一個Zookeeper實例! 在您的Kafka目錄中的bin文件夾中,運行以下命令:

./zookeeper-server-start.sh ../config/zookeeper.properties

默認情況下,此命令在端口2181上啟動Zookeeper服務器。 Zookeeper負責協調集群內的Kafka經紀人。 您將在本教程的Kafka項目中使用默認配置,但始終可以根據需要更改這些值。

運行卡夫卡經紀人

下一步是運行代理本身。 在另一個終端上,從bin文件夾運行以下命令:

./kafka-server-start.sh ../config/server.properties

您可能已經猜到了,此命令在默認端口9092上以默認配置運行Kafka服務器。

創建一個Kafka主題

現在您已運行代理和Zookeeper,現在可以指定一個主題以開始從生產者發送消息。 您將在bin文件夾中運行命令,就像前面的步驟一樣:

./kafka-topics.sh --create --topic myTopic -zookeeper \localhost:2181 --replication-factor 1 --partitions 1

該命令創建一個名為myTopic ,該myTopic指向您使用第一個命令啟動的Zookeeper實例。 您還必須指定兩個不同的參數: replication-factor和partitions 。 現在不用擔心它們-它們用于控制與Kafka中的分布式系統相關的特定方面。 在運行簡單設置時,可以為兩個參數都指定“ 1”。

既然一切就緒并開始運行,您就可以開始將Kafka與Java應用程序集成!

創建一個Java + Kafka應用程序

讓我們從項目結構開始,使用Spring Initializer創建應用程序。

轉到https://start.spring.io并填寫以下信息:

  • 項目:Maven項目
  • 語言:Java
  • 群組: com.okta.javakafka
  • 工件: kafka-java
  • 依存關系:
    • Spring網

您也可以使用命令行生成項目。 將以下命令粘貼到您的終端中,它將下載與上面定義的配置相同的項目:

curl https://start.spring.io/starter.zip -d language=java \-d dependencies=web,kafka \-d packageName=com.okta.javakafka \-d name=kafka-java \-d type=maven-project \-o kafka-java.zip

本教程使用Maven,但是您可以根據需要輕松地使用Gradle進行學習。

而已! 現在,您的Java項目結構已創建,您可以開始開發應用程序了。

在Java應用程序中將消息推送到Kafka主題

創建可以推送消息的生產者的第一步是在Java應用程序中配置生產者。 讓我們創建一個配置類來做到這一點。

創建一個src/main/java/com/okta/javakafka/configuration文件夾,并在其中創建一個ProducerConfiguration類:

import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap; import java.util.Map;@Configuration public class ProducerConfiguration {private static final String KAFKA_BROKER = "localhost:9092";@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigurations());}@Beanpublic Map<String, Object> producerConfigurations() {Map<String, Object> configurations = new HashMap<>();configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return configurations;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}

此類創建一個ProducerFactory ,該ProducerFactory知道如何根據您提供的配置創建生產者。 您還指定了連接到本地Kafka代理,并使用String序列化密鑰和值。

您還聲明了一個KafkaTemplate bean,以對生產者執行高級操作。 換句話說,該模板能夠執行諸如將消息發送到主題之類的操作,并有效地向您隱藏了后臺信息。

下一步是創建端點,以將消息發送給生產者。 在src/main/java/com/okta/javakafka/controller軟件包中,創建以下類:

import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.List;@RestController public class KafkaController {private KafkaTemplate<String, String> template;public KafkaController(KafkaTemplate<String, String> template) {this.template = template;}@GetMapping("/kafka/produce")public void produce(@RequestParam String message) {template.send("myTopic", message);}}

注意:由于您要發送要處理的數據,所以produce()方法實際上應該是POST。 出于演示目的,將其保留為GET更容易,因此您可以在瀏覽器中進行練習。

如您所見,此端點非常簡單。 它噴射KafkaTemplate前面配置和發送一個消息給myTopic當GET請求到由/kafka/produce 。

讓我們測試一切是否按預期進行。 在JavaKafkaApplication類中運行main方法。 要從命令行運行,請執行以下命令:

./mvnw spring-boot:run

您的服務器應在端口8080上運行,并且您已經可以對它發出API請求!

轉到網絡瀏覽器,然后訪問http:// localhost:8080 / kafka / produce?message =這是我的消息 。

當您使用上述命令進行調用時,您的應用程序將執行/kafka/produce端點,該端點將消息發送到Kafka中的myTopic主題。

但是,您如何知道該命令已成功向該主題發送了消息? 現在,您不會在應用程序內使用消息,這意味著您不能確定!

幸運的是,有一種簡單的方法可以立即創建消費者以進行測試。 在您的Kafka目錄的bin文件夾中,運行以下命令:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic

訪問http:// localhost:8080 / kafka / produce?message =這是我的消息,再次在運行Kafka使用者的終端中看到以下消息:

This is my message

做得好! 您可以暫時停止此命令。

讓我們添加一些Java代碼來使用應用程序中的消息,而不是從終端執行。

在Java應用中使用來自Kafka主題的消息

與生產者一樣,您需要添加配置以使消費者能夠找到Kafka經紀人。

在src/main/java/com/okta/javakafka/configuration創建以下類:

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap; import java.util.Map;@Configuration public class ConsumerConfiguration {private static final String KAFKA_BROKER = "localhost:9092";private static final String GROUP_ID = "kafka-sandbox";@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigurations());}@Beanpublic Map<String, Object> consumerConfigurations() {Map<String, Object> configurations = new HashMap<>();configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);configurations.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return configurations;}@BeanConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}

上面的代碼創建了一個工廠,該工廠知道如何連接到本地代理。 它還將您的使用者配置為針對鍵和值對String反序列化,以匹配生產者配置。

組ID是強制性的,Kafka使用組ID來允許并行數據消耗。 ConcurrentKafkaListenerContainerFactory bean使您的??應用可以在多個線程中使用消息。

現在,您的Java應用已配置為在您的Kafka經紀人中查找使用者,讓我們開始收聽發送給該主題的消息。

創建一個src/main/java/com/okta/javakafka/consumer目錄,并在其中創建以下類:

import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;import java.util.ArrayList; import java.util.List;@Component public class MyTopicConsumer {private final List<String> messages = new ArrayList<>();@KafkaListener(topics = "myTopic", groupId = "kafka-sandbox")public void listen(String message) {synchronized (messages) {messages.add(message);}}public List<String> getMessages() {return messages;}}

此類負責偵聽myTopic主題內的更改。 它通過使用KafkaListener注釋來實現。 每當生產者向主題發送新消息時,您的應用程序都會在此類內接收到一條消息。 它將一條消息添加到接收到的消息列表中,從而通過getMessages()方法將其提供給其他類。

接下來,讓我們創建一個顯示已消費消息列表的端點。 返回KafkaController以添加MyTopicConsumer作為依賴項和getMessages()方法。

import com.okta.javakafka.consumer.MyTopicConsumer; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.List;@RestController public class KafkaController {private KafkaTemplate<String, String> template;private MyTopicConsumer myTopicConsumer;public KafkaController(KafkaTemplate<String, String> template, MyTopicConsumer myTopicConsumer) {this.template = template;this.myTopicConsumer = myTopicConsumer;}@GetMapping("/kafka/produce")public void produce(@RequestParam String message) {template.send("myTopic", message);}@GetMapping("/kafka/messages")public List<String> getMessages() {return myTopicConsumer.getMessages();}}

此類現在具有一個新的終結點,以顯示存儲在使用者中的消息。 調用此端點時,它將發送已從Kafka主題處理過的當前消息。

您的Java應用程序現在同時具有Kafka生產者和使用者,因此讓我們一起進行測試! 重新啟動您的應用程序,然后轉到http:// localhost:8080 / kafka / messages 。

目前,沒有信息被返回。 原因很簡單:您的使用者僅配置為接收新消息,而您尚未發送新消息。 讓我們通過訪問網絡瀏覽器并訪問http:// localhost:8080 / kafka / produce?message =我的應用程序發送的消息來解決此問題! 。

當Kafka收到該消息時,它將立即讓您的消費者知道它。 繼續并在瀏覽器中轉到http:// localhost:8080 / kafka / messages 。 現在,您將看到您的消息已成功接收!

做得好! 您有一個能夠從Kafka產生和使用消息的Java應用程序! 但是,在我們將其稱為“一天”之前,還有最后一步,這是非常重要的一步。

保護您的Java Kafka應用程序

您的應用目前不是很安全。 盡管您已經準備好在分布式環境中處理許多消息,但是那些可以找到指向您的端點的鏈接的人仍然可以使用這些消息。 這是一個關鍵漏洞,因此請確保已正確解決此漏洞。

您將使用OAuth 2.0來確保只有經過身份驗證的用戶才能看到您的端點。 最好的部分? 使用Okta驗證用戶身份,只需5分鐘即可在您的應用中添加此功能!

創建一個Okta帳戶

如果您還沒有Okta帳戶, 請繼續創建一個 。 完成注冊后,請執行以下步驟:

  • 登錄到您的帳戶
  • 轉到應用程序 > 添加應用程序 。 您將被重定向到以下頁面:
  • 選擇網站 ,然后單擊下一步。
  • 在表格中填寫以下選項:
    • 姓名: Bootiful Kafka
  • 點擊完成

現在您有了Okta應用程序,可以使用它來在Java + Kafka應用程序中對用戶進行身份驗證。

使用用戶身份驗證保護Java應用安全

首先,將Okta的庫添加到您的項目中。 打開您的pom.xml并在<dependencies>標記內添加以下依賴項:

<dependency><groupId>com.okta.spring</groupId><artifactId>okta-spring-boot-starter</artifactId><version>1.3.0</version> </dependency>

該庫將與您剛創建的Okta應用程序集成。 它還會將Spring Security添加到您當前的應用程序中。 使用src/main/resources/application.properties的以下變量對其進行配置:

okta.oauth2.issuer: https://{yourOktaDomain}/oauth2/default okta.oauth2.client-id: {yourClientID} okta.oauth2.client-secret: {yourClientSecret}

重要說明 :此文件只能在本地使用。 不要將客戶的秘密提交給Git或任何其他版本控制系統。

為避免意外暴露這些憑據,您還可以將Okta應用程序的值指定為環境變量。 使用以下環境變量在應用程序的根目錄中創建okta.env文件。 然后在啟動應用程序之前運行source okta.env

export OKTA_OAUTH2_ISSUER=https://{yourOktaDomain}/oauth2/default export OKTA_OAUTH2_CLIENT_ID={yourClientID} export OKTA_OAUTH2_CLIENT_SECRET={yourClientSecret}

您可以在Okta UI的應用程序頁面中找到{yourClientID}和{yourClientSecret} 。 要訪問它,請按照以下步驟操作:

  • 在您的Okta菜單中,轉到“ 應用程序”
  • 選擇Bootiful Kafka應用程序
  • 單擊常規選項卡

您應該在“客戶端憑據”區域內看到兩個值。

值{yourOktaDomain}將在您的Okta儀表板中可見,只需單擊菜單上的儀表板。 您將在右上角看到組織URL。

而已!

重新啟動Spring Boot應用程序,然后轉到http:// localhost:8080 / kafka / messages 。 您的應用程序現在會將您重定向到登錄頁面:

注意:如果不提示您登錄,那是因為您已經登錄。在隱身窗口中打開您的應用程序,您將看到上面顯示的登錄屏幕。

輸入您的用戶名和密碼。 如果登錄嘗試成功,您將再次被重定向回您的應用程序。

恭喜你! 您現在有了一個安全的Java應用程序,該應用程序可以生成和使用來自Kafka的消息。

如果您想查看本教程的完整源代碼,請轉到 GitHub上的oktadeveloper / okta-java-kafka-example 。

想更多地了解Java,安全性和OAuth 2.0? 以下是您可能感興趣的一些鏈接:

  • OAuth 2.0 Java指南:5分鐘保護您的應用程序安全
  • OAuth和OpenID Connect圖解指南
  • 使用Spring Boot和Kotlin構建應用程序
  • 帶有Spring Boot和Spring Cloud的Java微服務
  • 使用Spring Cloud Gateway保護反應式微服務

有關此類文章的更多信息, 請在Twitter上關注@oktadev 。 我們還會定期將截屏視頻發布到我們的YouTube頻道 !

使用Okta的身份管理平臺輕松部署您的應用程序 使用Okta的API在幾分鐘之內即可對任何應用程序中的用戶進行身份驗證,管理和保護。 今天嘗試Okta。


翻譯自: https://www.javacodegeeks.com/2020/01/kafka-with-java-build-a-secure-scalable-messaging-app.html

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Java的Kafka:构建安全,可扩展的消息传递应用程序的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。