日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka 如何给集群配置Scram账户认证

發布時間:2023/12/14 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka 如何给集群配置Scram账户认证 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

很早之前的一篇博客【Kafka+ Centos7服務器集群詳細安裝教程】 詳細的說了下一個真正的集群應該如何搭建Kafka環境,由于當時的需求只是能夠使用Kafka服務,因此并沒有做別的什么配置。但是隨著Kafka不斷地使用,僅僅能用已經無法滿足業務的安全需求,因此對于Kafka做了一個權限的配置,那么本篇就針對Kafka如何配置賬戶認賬做一個詳細的演示,理論知識會稍有涉及但不是重點。更多內容請點擊【Apache Kafka API AdminClient 目錄】。

官網的權限支持

Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL. Kafka supports the following SASL mechanisms:
? SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0
? SASL/PLAIN - starting at version 0.10.0.0
? SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0
? SASL/OAUTHBEARER - starting at version 2.0

Kafka本身支持的認證方式有很多,同時支持SSL或者SASL,上面展示的四種是kafka官方推薦的SASL認證方式,如何選擇一個合適的認證方式呢?我們可以做一個橫向對比。

認證方式說明
SASL/GSSAPI主要是給 Kerberos 用戶使用的,如果當前已經有了Kerberos認證,只需要給集群中每個Broker和訪問用戶申請Principals,然后在Kafka的配置文件中開啟Kerberos的支持即可,官方參考:[Authentication using SASL/Kerberos]。
SASL/PLAIN是一種簡單的用戶名/密碼身份驗證機制,通常與TLS/SSL一起用于加密,以實現安全身份驗證。是一種比較容易使用的方式,但是有一個很明顯的缺點,這種方式會把用戶賬戶文件配置到一個靜態文件中,每次想要添加新的賬戶都需要重啟Kafka去加載靜態文件,才能使之生效,十分的不方便,官方參考[Authentication using SASL/PLAIN]。
SASL/SCRAM通過將認證用戶信息保存在 ZooKeeper 里面,從而動態的獲取用戶信息,相當于把ZK作為一個認證中心使用了。這種認證可以在使用過程中,使用 Kafka 提供的命令動態地創建和刪除用戶,無需重啟整個集群,十分方便。官方參考[Authentication using SASL/SCRAM]。
SASL/OAUTHBEARERkafka 2.0 版本引入的新認證機制,主要是為了實現與 OAuth 2 框架的集成。Kafka 不提倡單純使用 OAUTHBEARER,因為它生成的不安全的 Json Web Token,必須配以 SSL 加密才能用在生產環境中。官方參考[Authentication using SASL/OAUTHBEARER] 。

如果使用SASL/GSSAPI那么需要新搭建Kerberos不太劃算;SASL/PLAIN的方式可能會在使用過程中頻繁的重啟,非常的繁瑣;而SASL/OAUTHBEARER屬于Kafka新提供的,而且也沒有這方面的需求,可以等等市場反應再說。因此綜合來說最終選擇了SASL/SCRAM的認證方法增強Kafka的安全功能,這也是本篇博客的由來。

Zookeeper配置

Zookeeper集群怎么搭建參考【Zookeeper + Centos7 詳細安裝教程】,目前筆者用的Zookeeper 已經升級到3.5.8,但是搭建過程是一樣的,這里只貼出來Zookeeper的配置文件。首先進入Zookeeper根目錄下的/config目錄。


給zoo.cfg 添加SASL認證

新添加的兩行,用來支持SASL認證。

tickTime=2000 initLimit=10 syncLimit=5 dataDir=/home/data/zookeeper_data dataLogDir=/home/data/zookeeper_log #這里可以不配置,直接刪了 clientPort=2181#新添加的兩行在這里 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=saslserver.1=192.168.33.101:2887:3887 server.2=192.168.33.102:2887:3887 server.3=192.168.33.103:2887:3887 server.4=192.168.33.104:2887:3887 server.5=192.168.33.105:2887:3887

創建Zookeeper認證文件

使用vi zookeeper_jaas.conf命令把下面的內容粘貼進去,保存備用。

Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_super="super1234" user_kafka="kafka1234"; };

這里要對這兩個參數說明一下:user_super="super1234"這句配置的是超級用戶,在Zookeeper里面超級用戶默認就是super,后面引號里設置的則是它的密碼super1234。因此下面的user_kafka="kafka1234"設置的是Kafka連接Zookeeper要用的賬戶和密碼,這個賬戶和密碼在后面Kafka的配置中還要用,請先記住這點。其意思就是,一個叫做kafka的賬戶名,密碼是kafka1234。前面的user_就是為了識別這個配置是一個賬戶名用的,吐槽下這個設計真是好奇葩。


啟動時加載認證文件

回到/bin目錄,使用vi zookeeper-start.sh命令,創建一個新的啟動腳本,用于Zookeeper啟動時加載認證文件,把下面的內容粘貼進去,保存退出。注意這里的文件路徑和啟動路徑都是在/bin下的,如果你配置的東西不在這里,要修改路徑。

export JVMFLAGS="-Djava.security.auth.login.config=../conf/zookeeper_jaas.conf -Dzookeeper.4lw.commands.whitelist=*" ./zkServer.sh start &

然后使用命令chmod u+x zookeeper-start.sh給腳本文件賦權。


啟動

到此如果在/bin目錄下直接輸入命令sh zookeeper-start.sh就可以啟動Zookeeper,有如下字樣Zookeeper就是啟動成功了。

[root@centos01 bin]# sh zookeeper-start.sh [root@centos01 bin]# ZooKeeper JMX enabled by default Using config: /usr/app-zk-test/apps/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED

如果不放心還可以用ps –ef | grep “zookeeper”來看下啟動的現成是不是加載了zookeeper_jaas.conf文件,截取一部分輸出,下面紅框就是加載的認證文件。也可以用zkCli.sh去連接服務器看看是不是真的可用。那么到此Zookeeper配置啟動完畢。

Kafka配置

啟動完Zookeeper以后,就要著手進行Kafka相關的配置了。一個可用的Kafka集群怎么搭建的可以參考這篇文章【Kafka+ Centos7服務器集群詳細安裝教程】,目前筆者的版本已經升級到Kafka 2.7.0,但是這個帖子依然可以使用。首先進入Kafka根目錄下的/config目錄,筆者先貼上上次博客中所配置的部分。

#服務器id,這個是每個機器的唯一id。每個機器都是獨一無二的,習慣性就按順序來。 broker.id=1 #監聽端口,這里也可以用hostname代替,只要host里面配置好了都可以 listeners = PLAINTEXT://192.168.33.101:9092 #存放當前數據的目錄,這里所有機器盡量都一樣,這樣操作起來比較容易 log.dirs=/home/data/kafka-logs #連接Zookeeper。注意這里筆者再Zookeeper里面創建了一個叫做“/kakfa”的節點,所以最后一個ip上跟著 # 一個/kafka,看起來比較整潔。如果沒有這樣的需求不需要帶。 zookeeper.connect=192.168.33.101:2181,192.168.33.102:2181,192.168.33.103:2181,192.168.33.104:2181,192.168.33.105:2181/kafka #如果要直連根節點,請仿照這個 #zookeeper.connect=192.168.33.101:2181,192.168.33.102:2181,192.168.33.103:2181,192.168.33.104:2181,192.168.33.105:2181

上面就是之前博客中所有的Kafka配置,基本上就是一個裸連能用的配置。首先我們先說下listeners這個參數,這個參數是監聽端口,必須要配置的。采用什么樣的認證也是通過這個參數后面配置的參數決定的。Kafka默認的參數是PLAINTEXT,也就是我們上面配置的listeners = PLAINTEXT://192.168.33.101:9092這句話。這里的PLAINTEXT就是代表沒有任何認證,阿貓阿狗都可以連進來。要說明的是PLAINTEXT和上面介紹的SASL/PLAIN完全不是一個概念,SASL/PLAIN是一種文件認證方式,而PLAINTEXT是一個具體的認證模式。如果要開啟認證則需要使用SASL_PLAINTEXT,或者進一步使用SASL_SSL進行雙重加密,這個是后話了。

給server.properties添加SASL認證

參數到底應該怎么配置呢?下面就是一個筆者配置的關鍵參數例子,大家可以按照這個參數去配置自己的Kafka認證。至于其他的參數,大家可以根據實際需求取舍,這里筆者推薦幾個重要的參數,實際使用中最好配置上。

############################ 基礎配置如下 ############################## broker.id=1 #默認監控端口,設置9092使用SASL_PLAINTEXT協議 listeners=SASL_PLAINTEXT://192.168.33.101:9092 #advertised.listeners控制生產者與消費者接入的端口,如果不設置默認都用listeners,設置9092使用SASL_PLAINTEXT協議 advertised.listeners=SASL_PLAINTEXT://192.168.33.101:9092 log.dirs=/home/data/kafka-logs zookeeper.connect=192.168.33.101:2181,192.168.33.102:2181,192.168.33.103:2181,192.168.33.104:2181,192.168.33.105:2181/kafka############################ SASL/SCRAM相關配置如下 ############################## #Broker內部聯絡使用的security協議 security.inter.broker.protocol=SASL_PLAINTEXT #Broker內部聯絡使用的sasl協議,這里也可以配置多個,比如SCRAM-SHA-512,SCRAM-SHA-256并列使用 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 #Broker允許使用的sasl協議,這里也可以配多個PLAIN,SCRAM-SHA-512,SCRAM-SHA-256 sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512#設置zookeeper是否使用ACL zookeeper.set.acl=true #設置ACL類(低于 2.4.0 版本推薦使用 SimpleAclAuthorizer) #authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #設置ACL類(高于 2.4.0 版本推薦使用 AclAuthorizer) authorizer.class.name=kafka.security.authorizer.AclAuthorizer #設置Kafka超級用戶賬號,這兩個分別對應zookeeper_jaas.conf中的user_super="super1234"和user_kafka="kafka1234"; super.users=User:admin;User:kafka######################## 其他輔助配置,筆者推薦的重要配置 ####################### #每條最大消息設置為3MB,超過此size會報錯,可以自由調整 replica.fetch.max.bytes=3145728 message.max.bytes=3145728 #默認的備份數量,可以自由調整 default.replication.factor=2 #默認的partion數量,可以自由調整 num.partitions=3 #是否允許徹底刪除topic,低版本這里設置為false則是隱藏topic delete.topic.enable=true #如果topic不存在,是否允許創建一個新的。這里特別推薦設置為false,否則可能會因為手滑多出很多奇奇怪怪的topic出來 auto.create.topics.enable=false

創建Kafka認證文件

說完服務器的配置,現在還是在/config目錄下,我們還需要在啟動的時候加載一個認證文件。所以直接vi kafka-broker-jaas.conf創建一個認證文件,粘貼下面內容,保存退出。

KafkaServer {org.apache.kafka.common.security.scram.ScramLoginModule requiredusername="admin"password="admin1234"; }; Client {org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="kafka"password="kafka1234"; };

這里要解釋一下這兩塊的內容:首先KafkaServer這里配置的是Kafka服務器本身的超級賬戶admin和其密碼,使用的是ScramLoginModule模式,也就是標題的登陸認證方式。直接使用這個超級賬戶登陸,整個Kafka集群就相當于對你打開了大門。需要設計一些Kafka工具的時候可以使用,所以好好保存不要泄露了。后面配置的Client是用來登陸Zookeeper使用的,也就是上面我們配置到zookeeper_jaas.conf 文件中的user_kafka="kafka1234"一行所對應的,這里看到登陸Zookeeper要用的賬戶就是kafka,密碼就是kafka1234。這點設計的比較繞,需要多理解理解。


啟動時加載認證文件

直接在/config文件下執行vi kafka-starter.sh,這里筆者懶得跳到/bin目錄下,直接創建了。粘貼下面內容到文件里,保存退出。然后使用命令chmod u+x kafka-starter.sh給腳本文件賦權。

export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/app-zk-test/apps/kafka_2.13-2.7.0/config/kafka-broker-jaas.conf" export JMX_PORT=9999 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" ../bin/kafka-server-start.sh ../config/server.properties &

第一行KAFKA_OPTS配置的是加載的認證文件的路徑;第二行JMX_PORT是監控端口,可以不配置;第三行KAFKA_HEAP_OPTS是配置啟動占用內存的,隨意調整,也可以不配置用默認的;第四行執行Kafka開始腳本和做好的配置文件。


給Zookeeper中添加超級賬戶

完成以后跳轉到Kafka的/bin目錄下,用自帶的kafka-configs.sh腳本把Kafka服務器的超級賬戶添加到Zookeeper中,因為目前(Kafka 2.7.0)來說Kafka賬號密碼還是存在Zookeeper上的。這一步不需要Kafka啟動,但是Zookeeper要啟動。

#輸入命令創建超級用戶 sh kafka-configs.sh --zookeeper 192.168.33.101:2181/kafka --alter --add-config 'SCRAM-SHA-512=[password=admin1234]' --entity-type users --entity-name admin#如果是在Zookeeper根目錄下,不需要帶“/kafka” sh kafka-configs.sh --zookeeper 192.168.33.101:2181 --alter --add-config 'SCRAM-SHA-512=[password=admin1234]' --entity-type users --entity-name admin

啟動類配置認證文件

當上述步驟都配置完畢以后有些/bin目錄下的命令(比如kafka-console-producer.sh)都不能直接使用了,需要帶著用戶名密碼才可以,這就給我們做一些簡單的測試造成了很大的麻煩。我們可以通過在啟動類中配置認證文件,從而跳過用戶名密碼的輸入,這一步就是讓Kafka服務器識別SASL/PLAIN的認證方式。具體做法就是vi kafka-run-class.sh打開這個腳本,然后把下面的一行貼進去,文件開頭,文件末尾都可以,不要貼到循環或者if條件語句中就行。保存退出就可以準備啟動了,再次提醒認證文件路徑要寫對。

#粘貼下面這一行到kafka-run-class.sh腳本中 KAFKA_OPTS="-Djava.security.auth.login.config=/usr/app-zk-test/apps/kafka_2.13-2.7.0/config/kafka-broker-jaas.conf"

啟動

重新跳轉到/config目錄下,執行sh kafka-starter.sh啟動Kafka。筆者推薦使用nohup sh kafka-starter.sh后臺啟動,這樣日志就不會打印到前臺,還可以做別的事情。那么所有的Kafka機器按照上面的步驟配置一邊,然后啟動完畢,就可以開始使用了。第一運行會打印下面的started字樣,如果不是第一次啟動,則會把所有內容加載一遍輸出,這個字段就不太好找了。如果是nohup啟動,需要去nohup.out中查看。

[2020-08-03 21:14:43,863] INFO Kafka commitId : 21234bee31165527 (org.apache.kafka.common.utils.AppInfoParser) [2020-08-03 21:14:43,864] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)

注意事項

  • 以上步驟凡是涉及到目錄和名稱都是筆者自己在機器上配置的,不必一定按照筆者的來,只要路徑寫對了能訪問到對應文件,都沒有問題。如果按照筆者操作和配置的路徑走,一定不會有路徑問題。
  • 以上步驟必須全部在Linux(Centos、Ubuntu)系統下操作,切不可圖省事兒從外部Windows拖拽或者上傳到Linux,可能會因為文件內容的格式不一致導致無法啟動Zookeeper或者Kafka。
  • 以上步驟要在所有機器上做一遍,避免某個節點掛了整個服務不可用。
  • 驗證程序

    為了保證我們的配置確實可用,筆者提供了生產者和消費者測試程序,大家可以測試一下配置的集群是否能夠使用Scram登陸。
    Producer:

    public class TestProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.33.101:9092");props.put("acks", "1");props.put("retries", 3);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username='easy' password='easy1234';");/*props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username='admin' password='admin1234';");*/KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "topic_" + i, "topic_:" + i);Future<RecordMetadata> metadataFuture = producer.send(record);RecordMetadata recordMetadata = null;try {recordMetadata = metadataFuture.get();System.out.println("發送成功!");System.out.println("topic:" + recordMetadata.topic());System.out.println("partition:" + recordMetadata.partition());System.out.println("offset:" + recordMetadata.offset());} catch (Exception e) {System.out.println("發送失敗!");e.printStackTrace();}}producer.flush();producer.close();} }如果有未授權的用戶嘗試發送數據會報錯 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512

    Consumer:

    public class EasyTestConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.33.101:9092");props.put("group.id", "aaa");props.put("enable.auto.commit", "false");props.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username='easy' password='easy1234';");/*props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username='admin' password='admin1234';");*/KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(),record.offset(), record.key(), record.value());}}} }如果有未授權的用戶嘗試讀取數據就會報錯 Exception in thread "main" org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512 如果有未授權的分組嘗試讀取數據就會報錯 Exception in thread "main" org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: aaa

    認證后賦權

    配置好以后除了admin超級賬戶以外其他所有客戶端都無法直接連接因此我們需要對賬號進行賦權,可以參照以下命令執行。

    創建賬號: sh kafka-configs.sh --zookeeper 192.168.33.101:2181/kafka --alter --add-config 'SCRAM-SHA-512=[password=easy1234]' --entity-type users --entity-name easy 添加賬號寫權限: sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.33.101:2181/kafka --add --allow-principal User:easy --operation Read --topic my-topic 添加賬號讀權限: sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.33.101:2181/kafka --add --allow-principal User:easy --operation Read --topic my-topic 創建Group: sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.33.101:2181/kafka --add --allow-principal User: easy --group aaa

    如果要刪除,只需要把--add換成--remove即可,或者參考【Apache Kafka API AdminClient 目錄】里的內容,使用官方提供的AdminClient寫一套Java版本的工具出來,用命令行操作實在太繁瑣了。要對Kafka進行更一進步的SSL認證,可以參考【Kafka 如何給集群配置SSL認證】。

    可能的報錯

    這里照例列出筆者在配置過程中遇到的報錯。

    1. inter.broker.listener.name must be a listener name defined in advertised.listeners.

    [2021-02-18 15:50:42,400] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are PLAINTEXTat kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1781)at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1756)at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1312)at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:34)at kafka.Kafka$.main(Kafka.scala:68)at kafka.Kafka.main(Kafka.scala)

    這個報錯有2種可能:

  • 沒有配置advertised.listeners這個參數,配置上即可;
  • 參數security.inter.broker.protocol=SASL_PLAINTEXT里面配置的內容在listeners 和advertised.listeners里面都沒有。因為這兩個參數可以給不同的端口配置不同的安全協議,比如listeners=SASL_PLAINTEXT:// 192.168.33.101:9092, PLAINTEXT:// 192.168.33.101:9093, SASL_SSL:// 192.168.33.101:9094。這樣用戶就可以通過9092端口使用SASL/Scram驗證連接,使用9094端口使用SSL+SASL驗證連接,以及通過9093端口進行完全不要任何驗證的連接,因此PLAINTEXT一定不要在生產環境上配置,太危險了?;氐轿覀兊膱箦e,security.inter.broker.protocol配置的內容,必須在listeners里面配置的有才行,比如下面的配置就一定會報這個異常。
  • listeners= PLAINTEXT:// 192.168.33.101:9093 security.inter.broker.protocol= SASL_PLAINTEXT############ 下面是一些說明 ############### #listeners和advertised.listeners參數可以根據不同的端口和需求配置不同的認證方式,建議二者保持一致,如果有內外網,就區分端口給不同的上下游 listeners=SASL_PLAINTEXT://192.168.33.101:9092,PLAINTEXT://192.168.33.101:9093,SASL_SSL://192.168.33.101:9094 advertised.listeners=SASL_PLAINTEXT://192.168.33.101:9092,PLAINTEXT://192.168.33.101:9093,SASL_SSL://192.168.33.101:9094 #security.inter.broker.protocol這個參數的值必須配置一個上面有的才行 security.inter.broker.protocol= SASL_PLAINTEXT

    2. ERROR SASL authentication failed using login context ‘Client’ with exception: {}

    [2021-02-14 17:47:16,241] ERROR SASL authentication failed using login context 'Client' with exception: {} (org.apache.zookeeper.client.ZooKeeperSaslClient) javax.security.sasl.SaslException: Error in authenticating with a Zookeeper Quorum member: the quorum member's saslToken is null.at org.apache.zookeeper.client.ZooKeeperSaslClient.createSaslToken(ZooKeeperSaslClient.java:312)at org.apache.zookeeper.client.ZooKeeperSaslClient.respondToServer(ZooKeeperSaslClient.java:275)at org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:882)at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:101)at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1223) [2021-02-14 17:47:16,243] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-02-14 17:47:16,244] INFO EventThread shut down for session: 0x20000121b750003 (org.apache.zookeeper.ClientCnxn) [2021-02-14 17:47:16,269] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) org.apache.zookeeper.KeeperException$AuthFailedException: KeeperErrorCode = AuthFailed for /kafkaat org.apache.zookeeper.KeeperException.create(KeeperException.java:130)at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:564)at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1662)at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1560)at kafka.server.KafkaServer.$anonfun$initZkClient$2(KafkaServer.scala:461)at kafka.server.KafkaServer.$anonfun$initZkClient$2$adapted(KafkaServer.scala:458)at scala.Option.foreach(Option.scala:437)at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:458)at kafka.server.KafkaServer.startup(KafkaServer.scala:233)at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)at kafka.Kafka$.main(Kafka.scala:82)at kafka.Kafka.main(Kafka.scala)

    這種報錯是Kafka無法鏈接Zookeeper導致的,一般有以下兩種原因:

  • 使用windows拖拽的方式會導致有無法看見的結束符,不識別conf文件而造成失敗。
  • 或者啟動的時候沒有用新建的bash文件加載配置文件,而是直接啟動服務器腳本,導致沒有加載conf文件報錯。
  • 總結

    以上是生活随笔為你收集整理的Kafka 如何给集群配置Scram账户认证的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。