日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

漫游kafka实战篇之搭建Kafka开发环境

發(fā)布時間:2024/4/11 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 漫游kafka实战篇之搭建Kafka开发环境 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
轉載注明出處:http://blog.csdn.net/honglei915/article/details/37563647
Kafka視頻教程同步首發(fā),歡迎觀看!


上篇文章中我們搭建了kafka的服務器,并可以使用Kafka的命令行工具創(chuàng)建topic,發(fā)送和接收消息。下面我們來搭建kafka的開發(fā)環(huán)境。
添加依賴

搭建開發(fā)環(huán)境需要引入kafka的jar包,一種方式是將Kafka安裝包中l(wèi)ib下的jar包加入到項目的classpath中,這種比較簡單了。不過我們使用另一種更加流行的方式:使用maven管理jar包依賴。 創(chuàng)建好maven項目后,在pom.xml中添加以下依賴:
[html]?view plaincopy
  • <dependency>??
  • ????????????<groupId>org.apache.kafka</groupId>??
  • ????????????<artifactId>kafka_2.10</artifactId>??
  • ????????????<version>0.8.2.2</version>??
  • ????????</dependency>??



  • 添加依賴后如果有兩個jar包的依賴找不到。點擊這里下載這兩個jar包,解壓后你有兩種選擇,第一種是使用mvn的install命令將jar包安裝到本地倉庫,另一種是直接將解壓后的文件夾拷貝到mvn本地倉庫的com文件夾下,比如我的本地倉庫是d:\mvn,完成后我的目錄結構是這樣的:


    配置程序
    更新更全的API編程實例點這里:http://blog.csdn.net/honglei915/article/details/37697655
    首先是一個充當配置文件作用的接口,配置了Kafka的各種連接參數: [java]?view plaincopy
  • package?com.sohu.kafkademon;??
  • ??
  • public?interface?KafkaProperties??
  • {??
  • ????final?static?String?zkConnect?=?"10.22.10.139:2181";??
  • ????final?static?String?groupId?=?"group1";??
  • ????final?static?String?topic?=?"topic1";??
  • ????final?static?String?kafkaServerURL?=?"10.22.10.139";??
  • ????final?static?int?kafkaServerPort?=?9092;??
  • ????final?static?int?kafkaProducerBufferSize?=?64?*?1024;??
  • ????final?static?int?connectionTimeOut?=?20000;??
  • ????final?static?int?reconnectInterval?=?10000;??
  • ????final?static?String?topic2?=?"topic2";??
  • ????final?static?String?topic3?=?"topic3";??
  • ????final?static?String?clientId?=?"SimpleConsumerDemoClient";??
  • }??


  • producer
    [java]?view plaincopy
  • package?com.sohu.kafkademon;??
  • ??
  • import?java.util.Properties;??
  • ??
  • import?kafka.producer.KeyedMessage;??
  • import?kafka.producer.ProducerConfig;??
  • ??
  • /**?
  • ?*?@author?leicui?bourne_cui@163.com?
  • ?*/??
  • public?class?KafkaProducer?extends?Thread??
  • {??
  • ????private?final?kafka.javaapi.producer.Producer<Integer,?String>?producer;??
  • ????private?final?String?topic;??
  • ????private?final?Properties?props?=?new?Properties();??
  • ??
  • ????public?KafkaProducer(String?topic)??
  • ????{??
  • ????????props.put("serializer.class",?"kafka.serializer.StringEncoder");??
  • ????????props.put("metadata.broker.list",?"10.22.10.139:9092");??
  • ????????producer?=?new?kafka.javaapi.producer.Producer<Integer,?String>(new?ProducerConfig(props));??
  • ????????this.topic?=?topic;??
  • ????}??
  • ??
  • ????@Override??
  • ????public?void?run()?{??
  • ????????int?messageNo?=?1;??
  • ????????while?(true)??
  • ????????{??
  • ????????????String?messageStr?=?new?String("Message_"?+?messageNo);??
  • ????????????System.out.println("Send:"?+?messageStr);??
  • ????????????producer.send(new?KeyedMessage<Integer,?String>(topic,?messageStr));??
  • ????????????messageNo++;??
  • ????????????try?{??
  • ????????????????sleep(3000);??
  • ????????????}?catch?(InterruptedException?e)?{??
  • ????????????????//?TODO?Auto-generated?catch?block??
  • ????????????????e.printStackTrace();??
  • ????????????}??
  • ????????}??
  • ????}??
  • ??
  • }??



  • consumer
    [java]?view plaincopy
  • package?com.sohu.kafkademon;??
  • ??
  • import?java.util.HashMap;??
  • import?java.util.List;??
  • import?java.util.Map;??
  • import?java.util.Properties;??
  • ??
  • import?kafka.consumer.ConsumerConfig;??
  • import?kafka.consumer.ConsumerIterator;??
  • import?kafka.consumer.KafkaStream;??
  • import?kafka.javaapi.consumer.ConsumerConnector;??
  • ??
  • /**?
  • ?*?@author?leicui?bourne_cui@163.com?
  • ?*/??
  • public?class?KafkaConsumer?extends?Thread??
  • {??
  • ????private?final?ConsumerConnector?consumer;??
  • ????private?final?String?topic;??
  • ??
  • ????public?KafkaConsumer(String?topic)??
  • ????{??
  • ????????consumer?=?kafka.consumer.Consumer.createJavaConsumerConnector(??
  • ????????????????createConsumerConfig());??
  • ????????this.topic?=?topic;??
  • ????}??
  • ??
  • ????private?static?ConsumerConfig?createConsumerConfig()??
  • ????{??
  • ????????Properties?props?=?new?Properties();??
  • ????????props.put("zookeeper.connect",?KafkaProperties.zkConnect);??
  • ????????props.put("group.id",?KafkaProperties.groupId);??
  • ????????props.put("zookeeper.session.timeout.ms",?"40000");??
  • ????????props.put("zookeeper.sync.time.ms",?"200");??
  • ????????props.put("auto.commit.interval.ms",?"1000");??
  • ????????return?new?ConsumerConfig(props);??
  • ????}??
  • ??
  • ????@Override??
  • ????public?void?run()?{??
  • ????????Map<String,?Integer>?topicCountMap?=?new?HashMap<String,?Integer>();??
  • ????????topicCountMap.put(topic,?new?Integer(1));??
  • ????????Map<String,?List<KafkaStream<byte[],?byte[]>>>?consumerMap?=?consumer.createMessageStreams(topicCountMap);??
  • ????????KafkaStream<byte[],?byte[]>?stream?=?consumerMap.get(topic).get(0);??
  • ????????ConsumerIterator<byte[],?byte[]>?it?=?stream.iterator();??
  • ????????while?(it.hasNext())?{??
  • ????????????System.out.println("receive:"?+?new?String(it.next().message()));??
  • ????????????try?{??
  • ????????????????sleep(3000);??
  • ????????????}?catch?(InterruptedException?e)?{??
  • ????????????????e.printStackTrace();??
  • ????????????}??
  • ????????}??
  • ????}??
  • }??


  • 簡單的發(fā)送接收
    運行下面這個程序,就可以進行簡單的發(fā)送接收消息了: [java]?view plaincopy
  • package?com.sohu.kafkademon;??
  • ??
  • /**?
  • ?*?@author?leicui?bourne_cui@163.com?
  • ?*/??
  • public?class?KafkaConsumerProducerDemo??
  • {??
  • ????public?static?void?main(String[]?args)??
  • ????{??
  • ????????KafkaProducer?producerThread?=?new?KafkaProducer(KafkaProperties.topic);??
  • ????????producerThread.start();??
  • ??
  • ????????KafkaConsumer?consumerThread?=?new?KafkaConsumer(KafkaProperties.topic);??
  • ????????consumerThread.start();??
  • ????}??
  • }??


  • 高級別的consumer
    下面是比較負載的發(fā)送接收的程序: [java]?view plaincopy
  • package?com.sohu.kafkademon;??
  • ??
  • import?java.util.HashMap;??
  • import?java.util.List;??
  • import?java.util.Map;??
  • import?java.util.Properties;??
  • ??
  • import?kafka.consumer.ConsumerConfig;??
  • import?kafka.consumer.ConsumerIterator;??
  • import?kafka.consumer.KafkaStream;??
  • import?kafka.javaapi.consumer.ConsumerConnector;??
  • ??
  • /**?
  • ?*?@author?leicui?bourne_cui@163.com?
  • ?*/??
  • public?class?KafkaConsumer?extends?Thread??
  • {??
  • ????private?final?ConsumerConnector?consumer;??
  • ????private?final?String?topic;??
  • ??
  • ????public?KafkaConsumer(String?topic)??
  • ????{??
  • ????????consumer?=?kafka.consumer.Consumer.createJavaConsumerConnector(??
  • ????????????????createConsumerConfig());??
  • ????????this.topic?=?topic;??
  • ????}??
  • ??
  • ????private?static?ConsumerConfig?createConsumerConfig()??
  • ????{??
  • ????????Properties?props?=?new?Properties();??
  • ????????props.put("zookeeper.connect",?KafkaProperties.zkConnect);??
  • ????????props.put("group.id",?KafkaProperties.groupId);??
  • ????????props.put("zookeeper.session.timeout.ms",?"40000");??
  • ????????props.put("zookeeper.sync.time.ms",?"200");??
  • ????????props.put("auto.commit.interval.ms",?"1000");??
  • ????????return?new?ConsumerConfig(props);??
  • ????}??
  • ??
  • ????@Override??
  • ????public?void?run()?{??
  • ????????Map<String,?Integer>?topicCountMap?=?new?HashMap<String,?Integer>();??
  • ????????topicCountMap.put(topic,?new?Integer(1));??
  • ????????Map<String,?List<KafkaStream<byte[],?byte[]>>>?consumerMap?=?consumer.createMessageStreams(topicCountMap);??
  • ????????KafkaStream<byte[],?byte[]>?stream?=?consumerMap.get(topic).get(0);??
  • ????????ConsumerIterator<byte[],?byte[]>?it?=?stream.iterator();??
  • ????????while?(it.hasNext())?{??
  • ????????????System.out.println("receive:"?+?new?String(it.next().message()));??
  • ????????????try?{??
  • ????????????????sleep(3000);??
  • ????????????}?catch?(InterruptedException?e)?{??
  • ????????????????e.printStackTrace();??
  • ????????????}??
  • ????????}??
  • ????}??
  • }??


  • 總結

    以上是生活随笔為你收集整理的漫游kafka实战篇之搭建Kafka开发环境的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。