日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka动态认证SASL/SCRAM配置+整合springboot配置

發布時間:2023/12/14 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka动态认证SASL/SCRAM配置+整合springboot配置 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

記錄:

zookeeper啟動命令:

[root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh start
[root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh stop

kafka啟動命令:

/data/program/kafka2.12/bin/kafka-server-start.sh /data/program/kafka2.12/config/server.properties

創建SCRAM證書

1)創建broker建通信用戶:admin(在使用sasl之前必須先創建,否則啟動報錯)

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter?
--add-config 'SCRAM-SHA-256=[password=admin-sec],
SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin

2)創建生產用戶:producer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter?
--add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],
SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer

?3)創建消費用戶:consumer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter?
--add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],
SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consumer

SCRAM-SHA-256/SCRAM-SHA-512是對密碼加密的算法,二者有其一即可

查看SCRAM證書

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name consumer
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer

服務端配置

1)創建JAAS文件

vi config/kafka_server_jaas.conf

?內容:

?KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};

2)將JAAS配置文件位置作為JVM參數傳遞給每個Kafka Broker【bin/kafka-server-start.sh】添加

exec $base_dir/kafka-run-class.sh?
$EXTRA_ARGS -Djava.security.auth.login.config
=/home/test/kiki/kafka/ka/config/kafka_server_jaas.conf kafka.Kafka "$@"

操作:

vi bin/kafka-server-start.sh?

?修改最后一行

#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/program/kafka2.12/config/kafka_server_jaas.conf kafka.Kafka "$@"

3)配置server.properties【config/server.properties】

#認證配置
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

當前測試kafka端口為8100

?vi config/server.properties?

#listeners=PLAINTEXT://:8100

listeners=SASL_PLAINTEXT://:8100
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

advertised.listeners=SASL_PLAINTEXT://183.56.218.28:8100

#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

個人備注:

log.dirs=/data/program/kafka2.12/data

zookeeper.connect=183.56.218.28:2181

SCRAM-SHA-512與SCRAM-SHA-216可互相更改,看需要什么類型。PLAINTEXT為不需要認證

4)重啟Kafka和Zookeeper

客戶端配置

?1)為我們創建的三個用戶分別創建三個JAAS文件:分別命名為
kafka_client_scram_admin_jaas.conf
kafka_client_scram_producer_jaas.conf
kafka_client_scram_consumer_jaas.conf

vi bin/kafka_client_scram_admin_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-sec"; };

vi bin/kafka_client_scram_producer_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="prod-sec"; };

vi bin/kafka_client_scram_consumer_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec"; };

2)修改啟動腳本引入JAAS文件:
生產者配置:
配置bin/kafka-console-producer.sh

exec $(dirname $0)/kafka-run-class.sh?
-Djava.security.auth.login.config
=/data/program/kafka2.12/config/kafka_client_scram_producer_jaas.conf

消費者配置:
配置bin/kafka-console-consumer.sh

exec $(dirname $0)/kafka-run-class.sh?
-Djava.security.auth.login.config
=/data/program/kafka2.12/config/kafka_client_scram_consumer_jaas.conf

3)配置consumer.properties和producer.properties,都要加入以下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512

4)創建主題

[test@police ka]$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 2 --replication-factor 1

5)啟動生產(ps:結束也未能成功測試該命令是否能用,后面在代碼方面配置就好)

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test --producer.config config/producer.properties

發現會報權限相關的錯

6)對生產者賦予寫的權限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
?--allow-principal User:producer --operation Write --topic test

7)對消費者賦予讀的權限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
?--allow-principal User:consumer --operation Read --topic test

此時啟動消費者(ps:結束也未能成功測試該命令是否能用,后面在代碼方面配置就好)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties

此時依舊會報錯,報未對消費者組授權。給groupId配權

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add?
--allow-principal User:consumer --operation Read --group test-group

此時再啟動消費者,可以發現能正常消費生產者的消息

8)查看權限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer?
--authorizer-properties zookeeper.connect=localhost:2181 --list

springboot整合配置

權限主要配置部分格式:

? ? ? props.put("security.protocol", "SASL_PLAINTEXT");
? ? ? props.put("sasl.mechanism", "SCRAM-SHA-512");
? ? ? props.put("sasl.jaas.config",
? ? ? ? ? ? "org.apache.kafka.common.security.scram.ScramLoginModule required username='easy' password='easy1234';");

生產者:

//異步發送@Testfun customProducer() {//配置val properties = Properties()//鏈接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8100"//指定對應key和value的序列化類型(二選一) // properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_PLAINTEXT"properties[SaslConfigs.SASL_MECHANISM] = "SCRAM-SHA-512"properties[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"producer\" password=\"prod-sec\";"val kafkaProducer = KafkaProducer<String, String>(properties)//發送數據for (i in 0 until 1) {//黏性發送,達到設置的數據最大值/時間后,切換分區(不會是當前分區)kafkaProducer.send(ProducerRecord("test", "我是成功:::${LocalDateTime.now()}"))}//"type":"UPDATE/ADD/DELETE"//關閉資源kafkaProducer.close()}

消費者

package com.umh.medicalbookingplatform.background.configimport com.umh.medicalbookingplatform.core.properties.ApplicationProperties import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.annotation.EnableKafka import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory/*** @Description :* @Author xiaomh* @date 2022/8/30 14:14*/@EnableKafka @Configuration class KafkaConsumerConfig {@Autowiredprivate lateinit var appProperties: ApplicationProperties@Beanfun consumerFactory(): ConsumerFactory<String?, String?> {val props: MutableMap<String, Any> = HashMap()props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = appProperties.kafkaBootstrapServersConfig.toString()props[ConsumerConfig.GROUP_ID_CONFIG] = appProperties.kafkaGroupId.toString()props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.javaprops[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.javaprops[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.javaprops[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = appProperties.kafkaSecurityProtocol.toString()props[SaslConfigs.SASL_MECHANISM] = appProperties.kafkaSaslMechanism.toString()props[SaslConfigs.SASL_JAAS_CONFIG] = appProperties.kafkaSaslJaasConfig.toString()return DefaultKafkaConsumerFactory(props)}@Beanfun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {val factory = ConcurrentKafkaListenerContainerFactory<String, String>()factory.setConsumerFactory(consumerFactory())return factory} }

yml

kafkaBootstrapServersConfig: xxxxxx:8100 kafkaGroupId: test-group kafkaSecurityProtocol: SASL_PLAINTEXT kafkaSaslMechanism: SCRAM-SHA-512 kafkaSaslJaasConfig: org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec";

?參考

Kafka動態認證SASL/SCRAM驗證_慕木兮人可的博客-CSDN博客

總結

以上是生活随笔為你收集整理的Kafka动态认证SASL/SCRAM配置+整合springboot配置的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 久久无码精品丰满人妻 | 国产视频分类 | 成人xxx| 国产精品第| 麻豆短视频 | 一区二区成人在线观看 | 亚洲尤物在线 | 91精彩视频 | 日韩欧美大片在线观看 | 韩国伦理电影免费在线 | 老王66福利网| 无码人妻精品一区二区三区温州 | 国产精品探花一区二区在线观看 | 在线观看亚洲一区 | 亚洲AV无码国产精品播放在线 | 五月天丁香 | 国产 日韩 欧美在线 | 久久亚洲精少妇毛片午夜无码 | 国产成人精品一区二区三区免费 | 久久精品5 | 亚洲人精品午夜射精日韩 | 亚洲成人福利在线 | 黄色视屏软件 | 激情久久网站 | 操操操网站 | 性欧美videos另类艳妇3d | 女同一区 | 999精品视频在线观看 | 老妇女玩小男生毛片 | 四虎网站 | 人妻换人妻a片爽麻豆 | 亚洲欧洲在线观看 | 日韩激情影院 | 亚洲精品亚洲 | 欧美人与禽猛交乱配视频 | 高清视频一区二区三区 | 亚洲欧美一区二区三区久久 | 免费男女乱淫真视频免费播放 | 欧美日韩国产一区二区三区 | 好吊妞视频一区二区三区 | 最新毛片网 | 国产一卡二 | 日韩欧美亚洲精品 | 91极品尤物 | 第一福利在线视频 | 成年女人色毛片 | 日本性xxxxx| 日韩高清成人 | 国产性在线| av片免费在线 | 在线欧美视频 | 一本色道久久综合亚洲 | 97超碰免费 | 中文字幕在线视频一区二区 | 奇米影视7777 | 精品国产乱码一区二 | 四虎福利 | va欧美| 成人福利在线播放 | 69精品久久 | 日韩视频免费播放 | 久草青青草| 卡通动漫亚洲综合 | 青青国产在线观看 | 超碰97国产精品人人cao | 中文字幕日韩一区二区三区不卡 | 亚洲久久久久久 | 免费av导航| 午夜影院啊啊啊 | 久久久成人av | 夜色视频网站 | 都市激情一区 | 色噜噜狠狠狠综合曰曰曰 | 欧美绿帽交换xxx | 在线国产观看 | 国产日韩一级片 | 男插女在线观看 | 韩国女同性做爰三级 | 国产精品久久久久久久久久小说 | 大度亲吻原声视频在线观看 | 亚洲在线观看免费 | 扶她futa粗大做到怀孕 | 黄色av网址在线 | 8050午夜二级 | 免费在线观看www | 日韩在线一级片 | 亚洲中文字幕无码av永久 | 波多野结衣一区二区三区中文字幕 | 日韩人妻一区二区三区 | 亚欧乱色 | 精品少妇一区二区 | 亚洲成肉网 | 日韩女优中文字幕 | 香蕉伊思人视频 | 一区二区欧美视频 | 日韩av网址大全 | 日本视频在线播放 | 人妻体内射精一区二区 | 欧美精品国产动漫 |