日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

CDH Kerberos 认证下Kafka 消费方式

發布時間:2024/8/23 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 CDH Kerberos 认证下Kafka 消费方式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

集群Kerberos認證安裝參考:https://datamining.blog.csdn.net/article/details/98480008

目錄

?

環境:

配置

Java Producer 代碼

文件內容:

kafka_client_jaas.conf

krb5.conf? ?( kerberos 配置文件復制過來即可)

kafka.keytab

Java Consumer 代碼?

Linux 控制臺消費

Linux 控制臺發送數據數據

Linux 控制臺創建、刪除Topic


環境:

CDH 6.x

Kafka 1.0.1

?

? ? ? ? 加入kerberos認證的Kafka是無法直接用Api進行消費,需要進行安全認證。

配置

查看CDH中配置是否和下面一樣,不一樣則修改

Java Producer 代碼

這里只列出配置的代碼,其他的與普通producer相同

import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.io.IOException; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;public class KafkaProducer {private static BlockingQueue<Future<RecordMetadata>> queue = new ArrayBlockingQueue<Future<RecordMetadata>>(8192*2);private static long lastCommitTime = 0;private static boolean flag = true;Producer<String, String> producer = null;public KafkaProducer() {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//這里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("max.request.size", 8000000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);}}

resources目錄下文件

文件內容:

kafka_client_jaas.conf

KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka; };

krb5.conf? ?( kerberos 配置文件復制過來即可)

# Configuration snippets may be placed in this directory as well includedir /etc/krb5.conf.d/[logging]default = FILE:/var/log/krb5libs.logkdc = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.log[libdefaults]dns_lookup_realm = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = truerdns = falsepkinit_anchors = /etc/pki/tls/certs/ca-bundle.crtdefault_realm = JAST.COM # default_ccache_name = KEYRING:persistent:%{uid}[realms]JAST.COM = {kdc = cs-1admin_server = cs-1 }[domain_realm].jast.com = JAST.COMjast.com = JAST.COM

?

kafka.keytab

? ? ? ? kerberos生成的keytab文件

生成文件方式:

kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"

具體可參考:

https://datamining.blog.csdn.net/article/details/98625330

Java Consumer 代碼?

與Producer基本一致,文件說明參考Producer代碼

import java.util.Arrays; import java.util.Date; import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords;public class KafkaConsumer {private static KafkaConsumer kafkaSink = null;org.apache.kafka.clients.consumer.KafkaConsumer consumer;private static int number;public KafkaConsumer(String topic,int count) {System.setProperty("java.security.auth.login.config", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\kafka_client_jaas.conf");System.setProperty("java.security.krb5.conf", "C:\\Users\\Administrator\\eclipse-workspace\\kafka\\src\\main\\resources\\krb5.conf");//這里也可以使用提交方式指定配置文件 -Djava.security.krb5.conf=/krb5.conf -Djava.security.auth.login.config=/kafka_server_jaas.confProperties props = new Properties();props.put("bootstrap.servers", KafkaParameter.KAFKA_HOST_IP);props.put("group.id", "y" );props.put("zookeeper.session.timeout.ms", "600000");props.put("zookeeper.sync.time.ms", "200000");props.put("auto.commit.interval.ms", "100000");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG, count+"");//設置最大消費數props.put("security.protocol", "SASL_PLAINTEXT");consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic)); //"collectionInfo"}}

Linux 控制臺消費

生成kafka用戶keytab文件

kadmin.local -q "xst -norandkey -k kafka.keytab kafka@JAST.COM"

生成kafka_client_jaas.conf文件,位置隨意,內容如下

# cat config/kafka_jaas.conf KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/root/jast/kafka.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka@JAST.COM"serviceName=kafka; };

添加環境變量引用jaas文件

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_client_jaas.conf"

創建consumer.properties文件,內容如下

security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka group.id=test11

此時就可以消費了

/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.200:9092 --topic test --from-beginning --consumer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/consumer.properties

成功消費數據

Linux 控制臺發送數據數據

創建producer.properties文件,內容如下

# cat producer.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka

發送數據

/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/bin/kafka-console-producer.sh --broker-list 192.168.0.200:9092 --topic test --producer.config /opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/producer.properties

Producer

查看Consumer?消費成功

Linux 控制臺創建、刪除Topic

在linux 系統配置上面設置的jaas環境變量后即可

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/cloudera/parcels/CDH-6.0.1-1.cdh6.0.1.p0.590678/lib/kafka/config/kafka_jaas.conf"

?

總結

以上是生活随笔為你收集整理的CDH Kerberos 认证下Kafka 消费方式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。