日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Springboot集成使用阿里云kafka详细步骤

發(fā)布時間:2023/12/18 javascript 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Springboot集成使用阿里云kafka详细步骤 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

轉(zhuǎn)載請注明出處:Springboot集成使用阿里云kafka詳細(xì)步驟

明確連接認(rèn)證類型

首先要明確使用哪種連接認(rèn)證類型

Ons模式參考

https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-java-demo/beta

Ons模式的conf內(nèi)容

KafkaClient {com.aliyun.openservices.ons.sasl.client.OnsLoginModule requiredAccessKey="XXX"SecretKey="XXX"; };

Plain模式參考

https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-java-demo/vpc-ssl

Plain模式的conf內(nèi)容

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="xxxxxxxxxxxxxxxxxxxxx"password="xxxxxxxxxxxxxxxxxxxxx"; };

分別在這兩個帖子中下載對應(yīng)的jks證書和conf文件。

或者參考代碼的相應(yīng)目錄下

注意,這兩個配置都不能打包到j(luò)ar包中,否則容易無法識別和出問題,所以我們需要放在服務(wù)的明確路徑里。

例如/jar/kafka_client_jaas.conf和/jar/kafka.client.truststore.jks

集成

springboot版本為1.5.2。

引入kafka-client的jar包

在項目的pom文件中添加kafka-clients并且排除spring-kafka中的kafka-clients。

因為spring-kafka目前最新版本為2.1.2,其依賴的kafka-clients是1.0.x,但Kafka 服務(wù)端版本是 0.10,Client 版本建議 0.10,所以此處需排除依賴重新引入,否則一直報錯:disconnected

如下:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.0.0</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>

新建KafkaAliyunConfiguration類

KafkaAliyunConfiguration.java

package com.biologic.util;import java.net.URL; import java.util.HashMap; import java.util.Map;import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.util.StringUtils;@Configuration @EnableKafka public class KafkaAliyunConfiguration {@Value("${kafka.broker.address}")private String brokerAddress;@Value("${kafka.sample.topic}")private String defaultTopic;@Value("${kafka.jks.location}")private String jksLocation;@Value("${kafka.sample.retrycount}")private String retrycount;public KafkaAliyunConfiguration() {//如果用-D 或者其它方式設(shè)置過,這里不再設(shè)置if (null == System.getProperty("java.security.auth.login.config")) {//請注意將 XXX 修改為自己的路徑//這個路徑必須是一個文件系統(tǒng)可讀的路徑,不能被打包到 jar 中System.setProperty("java.security.auth.login.config", "/jar/kafka_client_jaas.conf");}System.out.println("環(huán)境變量中已有config文件,kafka配置為:"+System.getProperty("java.security.auth.login.config"));}public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<String, Object>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);if (StringUtils.isEmpty(jksLocation)) {props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaAliyunConfiguration.class.getClassLoader().getResource("kafka.client.truststore.jks").getPath());} else {props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksLocation);}props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");props.put(ProducerConfig.RETRIES_CONFIG, retrycount);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<String, String>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());kafkaTemplate.setDefaultTopic(defaultTopic);return kafkaTemplate;} }

此處定義了四個變量,通過配置文件注入:

brokerAddress kafka服務(wù)器地址

defaultTopic kafka默認(rèn)topic

jksLocation JKS文件地址(開發(fā)環(huán)境無需定義,直接讀取resources下的jks,但生產(chǎn)環(huán)境需讀取jar包外部的jks文件,所以此處需配置路徑)

retrycount 重試次數(shù)

配置文件properties中增加相應(yīng)變量

在application-beta.properties中增加對應(yīng)配置如下:

kafka.broker.address=39.76.22.123:9093,39.175.15.234:9093,39.126.188.165:9093kafka.sample.retrycount=100kafka.sample.topic=save_samplekafka.jks.location=/jar/kafka.client.truststore.jks

新建KafkaService發(fā)送消息

KafkaService.java

package com.biologic.api.service;import org.springframework.stereotype.Service;@Service public interface KafkaService {void sendMessage(String topic, String data);void releaseKafkaMsg(String barcode, String chip);}

KafkaServiceImpl.java

package com.biologic.api.service.impl;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;import com.biologic.api.service.KafkaService;import net.sf.json.JSONArray; import net.sf.json.JSONObject;@Service public class KafkaServiceImpl implements KafkaService {@Value("${kafka.sample.topic}")private String sampleTopic;private Logger LOG = LoggerFactory.getLogger(KafkaServiceImpl.class);// private final KafkaTemplate<Integer, String> kafkaTemplate;//// /**// * 注入KafkaTemplate// * @param kafkaTemplate kafka模版類// */// @Autowired// public KafkaServiceImpl(KafkaTemplate kafkaTemplate) {// this.kafkaTemplate = kafkaTemplate;// }@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String data) {LOG.info("kafka sendMessage start");ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);}@Overridepublic void onSuccess(SendResult<String, String> result) {LOG.info("kafka sendMessage success topic = {}, data = {}", topic, data);}});LOG.info("kafka sendMessage end");}public void releaseKafkaMsg(String barcode, String chip) {try {JSONArray data = new JSONArray();JSONObject kafka_sample_state = new JSONObject();kafka_sample_state.put("plate_id", chip);kafka_sample_state.put("barcode", barcode);kafka_sample_state.put("status", "release_report");data.add(kafka_sample_state);JSONObject sample_list = new JSONObject();sample_list.put("sample_list", data.toString());sendMessage(sampleTopic, sample_list.toString());} catch (Exception e) {e.printStackTrace();}}}

外部注入路徑變量的方式

我們上面的代碼中是把conf文件的路徑寫死的,如果需要變動地址,可以使用以下方式

環(huán)境注入conf文件路徑

因為代碼中會默認(rèn)獲取環(huán)境變量中的java.security.auth.login.config配置,所以只需要啟動時 賦值路徑即可。

-Djava.security.auth.login.config=你的配置絕對路徑

完整啟動springboot的項目命令如下:

java -jar /jar/report-api-1.0.0-SNAPSHOT.jar --spring.profiles.active=beta -Djava.security.auth.login.config=/jar/kafka_client_jaas.conf

變量注入conf文件路徑

注意 因為類的初始化在注入變量之前,所以conf的路徑不能用變量的方式注入,否則會報空指針錯誤。

如下用法會報錯

@Value("${kafka.conf.location}")private String confLocation;public KafkaAliyunConfiguration() {// 如果用-D 或者其它方式設(shè)置過,這里不再設(shè)置if (null == System.getProperty("java.security.auth.login.config")) {// 請注意將 XXX 修改為自己的路徑// 這個路徑必須是一個文件系統(tǒng)可讀的路徑,不能被打包到 jar 中System.setProperty("java.security.auth.login.config", confLocation);System.out.println("使用配置中的路徑,kafka配置為:" + System.getProperty("java.security.auth.login.config"));} else {System.out.println("環(huán)境變量中已有config文件,kafka配置為:" + System.getProperty("java.security.auth.login.config"));}}

安全層面加固

因為直接conf文件中包含帳號密碼容易被其他人查看到,有一種方式是外部引入模版文件,使用環(huán)境變量中的帳號密碼修改conf文件。

模版文件如下:

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="_KAFKA_ALIYUN_USERNAME_"password="_KAFKA_ALIYUN_PASSWORD_"; };

使用shell命令從s3中下載conf文件并修改conf文件如下:

initContainers:- name: pull-libimage: anigeo/awscli:latestcommand: ["/bin/sh","-c"] args: ['aws s3 cp s3://test-env/kafka.client.truststore.jks /jar/ ;aws s3 cp s3://test-env/kafka_client_jaas.conf /jar/ ;sed -i "s/_KAFKA_ALIYUN_USERNAME_/${KAFKA_SSL_USERNAME}/" /jar/kafka_client_jaas.conf;sed -i "s/_KAFKA_ALIYUN_PASSWORD_/${KAFKA_SSL_PASSWORLD}/" /jar/kafka_client_jaas.conf']env:- name: AWS_DEFAULT_REGIONvalue: cn-southwest-2- name: KAFKA_SSL_USERNAMEvalueFrom:secretKeyRef:name: aliyun-kafkakey: username- name: KAFKA_SSL_PASSWORLDvalueFrom:secretKeyRef:name: aliyun-kafkakey: passwordvolumeMounts:- name: workdirmountPath: /jar

可能遇到的問題–org.apache.kafka.common.errors.UnsupportedSaslMechanismException: Client SASL mechanism ‘ONS’ not enabled in the server, enabled mechanisms are [PLAIN]

原因

代碼中使用的配置與conf中設(shè)置的安全機(jī)制不一致。

解決方式

PLAIN模式
代碼中

props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

對應(yīng)conf內(nèi)容

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="xxxxxxxxxxxxxxxxxxxxx"password="xxxxxxxxxxxxxxxxxxxxx"; };

ONS模式
代碼中

props.put(SaslConfigs.SASL_MECHANISM, "ONS");

對應(yīng)conf內(nèi)容

KafkaClient {com.aliyun.openservices.ons.sasl.client.OnsLoginModule requiredAccessKey="XXX"SecretKey="XXX"; };

可能遇到的問題–nested exception is java.lang.NullPointerException

使用代碼為

public KafkaAliyunConfiguration() {if (StringUtils.isEmpty(confLocation)) {URL authLocation = KafkaAliyunConfiguration.class.getClassLoader().getResource("kafka_client_jaas.conf");if (System.getProperty("java.security.auth.login.config") == null) {System.setProperty("java.security.auth.login.config", authLocation.toExternalForm());}System.out.println("kafka配置為:"+authLocation.toExternalForm());} else {System.out.println("kafka配置為:"+confLocation);System.setProperty("java.security.auth.login.config", confLocation);} }

在進(jìn)行KafkaAliyunConfiguration初始化時報錯空指針。

Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'qualityServiceImpl': Unsatisfied dependency expressed through field 'kafkaService'; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaServiceImpl': Unsatisfied dependency expressed through field 'kafkaTemplate'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaAliyunConfiguration' defined in URL [jar:file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/biologic/util/KafkaAliyunConfiguration.class]: Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.biologic.util.KafkaAliyunConfiguration$$EnhancerBySpringCGLIB$$88e40778]: Constructor threw exception; nested exception is java.lang.NullPointerException

原因 初始化KafkaAliyunConfiguration時,變量加載的順序問題導(dǎo)致無法識別到變量。

解決方式

方式一 初始化時不使用注入的變量
如下:

public KafkaAliyunConfiguration() {//如果用-D 或者其它方式設(shè)置過,這里不再設(shè)置if (null == System.getProperty("java.security.auth.login.config")) {//請注意將 XXX 修改為自己的路徑//這個路徑必須是一個文件系統(tǒng)可讀的路徑,不能被打包到 jar 中System.setProperty("java.security.auth.login.config", "/jar/kafka_client_jaas.conf");}System.out.println("環(huán)境變量中已有config文件,kafka配置為:"+System.getProperty("java.security.auth.login.config"));}

方式二 將bean方法設(shè)置成static靜態(tài)方法

參考 spring boot整合shiro引用配置文件配置是出現(xiàn)的問題

可能遇到的問題–Caused by: java.io.FileNotFoundException: file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/kafka.client.truststore.jks (No such file or directory)

原因

打入jar包的證書和conf文件無法讀取,或者沒有設(shè)置外部路徑導(dǎo)致默認(rèn)讀取項目內(nèi)的配置。

解決方式

通過外部明確的linux路徑進(jìn)行配置。

可能遇到問題–Configuration Error:Line 3: expected [option key]

ssl.truststore.location = file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/kafka.client.truststore.jksServiceExceptionHandler.java[line:30] exception ERROR org.apache.kafka.common.KafkaException: Failed to construct kafka producerCaused by: java.io.IOException: Configuration Error:Line 3: expected [option key]

原因–配置文件無法讀取或者參數(shù)格式錯誤。

解決方法

通過外部明確的linux路徑進(jìn)行配置jks和conf文件, 并且注意conf中的參數(shù)格式–分號,冒號要與原文件一致。

轉(zhuǎn)載請注明出處:Springboot集成使用阿里云kafka詳細(xì)步驟

參考鏈接

https://help.aliyun.com/document_detail/99958.html?spm=a2c4g.11186623.6.563.7b3b1e3bEl5oex

https://yq.aliyun.com/articles/433740

總結(jié)

以上是生活随笔為你收集整理的Springboot集成使用阿里云kafka详细步骤的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。