日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

5、kafka的操作

發布時間:2025/5/22 编程问答 83 豆豆
生活随笔 收集整理的這篇文章主要介紹了 5、kafka的操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

#1、通過shell腳本

  • 查看當前服務器中的所有topic

    bin/kafka-topics.sh --list --zookeeper zk01:2181

  • 創建topic

    bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test

  • 刪除topic

    bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test

    需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。

  • 生產數據

    kafka-console-producer.sh --broker-list kafka01:9092 --topic itheima

    演變操作:(不需要任何的數據采集工具)tail -F /root/log.log | kafka-console-producer.sh --broker-list hadoop1:9092 --topic accesslogs
  • 消費消息

    sh bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1

  • 查看消費位置

    sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup

  • 查看某個Topic的詳情

    sh kafka-topics.sh --topic test --describe --zookeeper zk01:2181

  • 對分區數進行修改

    kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic

#2、通過Java的api操作:

生產者API

public class KafkaProducerSimple {public static void main(String[] args) throws InterruptedException {String TOPIC = "test9";Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", "kafka01:9092");props.put("request.required.acks", "1");//定義一個partition分區器props.put("partitioner.class", "cn.itcast.storm.kafka.producer.MyLogPartitioner");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));int messageNo = 0;while (true){String messageStr = new String("produce數據");KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(TOPIC, messageNo + "", messageStr);producer.send(keyedMessage);messageNo +=1;}} }

消費者API

public class KafkaConsumerSimple implements Runnable {public String title;public KafkaStream<byte[], byte[]> stream;public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {this.title = title;this.stream = stream;}public void run() {System.out.println("開始運行 " + title);ConsumerIterator<byte[], byte[]> it = stream.iterator();/*** 不停地從stream讀取新到來的消息,在等待新的消息時,hasNext()會阻塞* 如果調用 `ConsumerConnector#shutdown`,那么`hasNext`會返回false* */while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> data = it.next();String topic = data.topic();int partition = data.partition();long offset = data.offset();String msg = new String(data.message());System.out.println(String.format("Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]",title, topic, partition, offset, msg));}System.err.println(String.format("Consumer: [%s] exiting ...", title));}public static void main(String[] args) throws Exception{// main方法Properties props = new Properties();props.put("group.id", "testGroup");props.put("zookeeper.connect", "zk01:2181,zk02:2181,zk03:2181");props.put("auto.offset.reset", "largest");props.put("auto.commit.interval.ms", "1000");props.put("partition.assignment.strategy", "roundrobin");ConsumerConfig config = new ConsumerConfig(props);String topic = "test9";//只要ConsumerConnector還在的話,consumer會一直等待新消息,不會自己退出ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);//創建topicCountMapMap<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic,9);Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);//取出 `kafkaTest` 對應的 streamsList<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);//創建一個容量為20的線程池ExecutorService executor = Executors.newFixedThreadPool(9);//創建20個consumer threadsfor (int i = 0; i < streams.size(); i++)executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i)));} }

轉載于:https://my.oschina.net/liufukin/blog/800434

總結

以上是生活随笔為你收集整理的5、kafka的操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产精品久久无码一三区 | 精品国产乱码一区二区 | 国产成人专区 | 综合色婷婷一区二区亚洲欧美国产 | 亚洲图片欧美另类 | 特黄视频免费看 | 欧美国产在线视频 | 中文字幕一区二区三区人妻四季 | 一区二区三区 欧美 | 欧美视频中文字幕 | 97av.com| 中文字幕 自拍 | 男生插女生网站 | 男女互操视频 | 免费看一级一片 | 护士的小嫩嫩好紧好爽 | 亚洲图片三区 | 特级特黄aaaa免费看 | 一本久久久久 | 亚洲精品国产乱伦 | 午夜久久电影 | 一区二区三区视频在线观看免费 | 亚洲在线影院 | 色91精品久久久久久久久 | 国产亚洲欧美在线 | 日韩欧美中文字幕一区二区 | 永久免费视频网站 | 午夜精品成人 | 国产成人在线免费 | 右手影院亚洲欧美 | av激情在线 | 日韩日日日 | 亚洲乱码国产乱码精品 | 欧美日韩一本 | 男人的天堂一区 | 一本大道综合伊人精品热热 | 狼色网 | 波多野结衣久久 | 五月激情婷婷综合 | 激烈娇喘叫1v1高h糙汉 | 久久久久久999 | www.国产欧美| 狠狠影院 | 伊人久久成人网 | 婷婷麻豆 | 成人午夜剧场视频网站 | 亚洲欧美在线视频 | 国产精品99久久 | 亚洲在线精品视频 | 亚洲va韩国va欧美va精品 | 久久中文免费视频 | 国产网站久久 | 亚洲欧美韩日 | 黄色三级三级三级 | 69网站在线观看 | 亚洲iv一区二区三区 | 亚洲国产日韩在线一区 | 国产福利专区 | 操她视频网站 | 91香蕉国产在线观看 | 日本精品网 | 婷婷激情五月综合 | 一区二区成人网 | 精品人妻一区二区色欲产成人 | 久久艹av| 午夜性生活视频 | 国产看黄网站 | 成人av中文字幕 | 一区二区在线看 | 日本aaa视频 | 国产1区2区3区中文字幕 | 久久橹 | 男人的天堂中文字幕 | 激情丁香| 亚av在线 | www.香蕉视频.com | 久久精工是国产品牌吗 | 黑人vs亚洲人在线播放 | 久久av喷吹av高潮av萌白 | 色一情一乱一区二区三区 | 成人毛片视频在线观看 | 国产在线不卡一区 | 本田岬av | 亚洲黄色片网站 | 亚洲精品免费观看 | 浴室里强摁做开腿呻吟男男 | 五月在线视频 | 美丽姑娘免费观看在线观看 | 国产欧美精品久久久 | 国产二区av | 成人免费在线视频网站 | 日本少妇bb | 日韩三级麻豆 | 国产精品一 | 激情网络 | bt天堂av| 性感美女视频一二三 | 国产黄色自拍 | 操你妹影院 |