漫游kafka实战篇之搭建Kafka开发环境
生活随笔
收集整理的這篇文章主要介紹了
漫游kafka实战篇之搭建Kafka开发环境
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
轉載注明出處:http://blog.csdn.net/honglei915/article/details/37563647
Kafka視頻教程同步首發(fā),歡迎觀看!
上篇文章中我們搭建了kafka的服務器,并可以使用Kafka的命令行工具創(chuàng)建topic,發(fā)送和接收消息。下面我們來搭建kafka的開發(fā)環(huán)境。
添加依賴
搭建開發(fā)環(huán)境需要引入kafka的jar包,一種方式是將Kafka安裝包中l(wèi)ib下的jar包加入到項目的classpath中,這種比較簡單了。不過我們使用另一種更加流行的方式:使用maven管理jar包依賴。 創(chuàng)建好maven項目后,在pom.xml中添加以下依賴:
[html]?view plaincopy <dependency>?? ????????????<groupId>org.apache.kafka</groupId>?? ????????????<artifactId>kafka_2.10</artifactId>?? ????????????<version>0.8.2.2</version>?? ????????</dependency>??
添加依賴后如果有兩個jar包的依賴找不到。點擊這里下載這兩個jar包,解壓后你有兩種選擇,第一種是使用mvn的install命令將jar包安裝到本地倉庫,另一種是直接將解壓后的文件夾拷貝到mvn本地倉庫的com文件夾下,比如我的本地倉庫是d:\mvn,完成后我的目錄結構是這樣的:
配置程序
更新更全的API編程實例點這里:http://blog.csdn.net/honglei915/article/details/37697655
首先是一個充當配置文件作用的接口,配置了Kafka的各種連接參數: [java]?view plaincopy package?com.sohu.kafkademon;?? ?? public?interface?KafkaProperties?? {?? ????final?static?String?zkConnect?=?"10.22.10.139:2181";?? ????final?static?String?groupId?=?"group1";?? ????final?static?String?topic?=?"topic1";?? ????final?static?String?kafkaServerURL?=?"10.22.10.139";?? ????final?static?int?kafkaServerPort?=?9092;?? ????final?static?int?kafkaProducerBufferSize?=?64?*?1024;?? ????final?static?int?connectionTimeOut?=?20000;?? ????final?static?int?reconnectInterval?=?10000;?? ????final?static?String?topic2?=?"topic2";?? ????final?static?String?topic3?=?"topic3";?? ????final?static?String?clientId?=?"SimpleConsumerDemoClient";?? }??
producer
[java]?view plaincopy package?com.sohu.kafkademon;?? ?? import?java.util.Properties;?? ?? import?kafka.producer.KeyedMessage;?? import?kafka.producer.ProducerConfig;?? ?? /**? ?*?@author?leicui?bourne_cui@163.com? ?*/?? public?class?KafkaProducer?extends?Thread?? {?? ????private?final?kafka.javaapi.producer.Producer<Integer,?String>?producer;?? ????private?final?String?topic;?? ????private?final?Properties?props?=?new?Properties();?? ?? ????public?KafkaProducer(String?topic)?? ????{?? ????????props.put("serializer.class",?"kafka.serializer.StringEncoder");?? ????????props.put("metadata.broker.list",?"10.22.10.139:9092");?? ????????producer?=?new?kafka.javaapi.producer.Producer<Integer,?String>(new?ProducerConfig(props));?? ????????this.topic?=?topic;?? ????}?? ?? ????@Override?? ????public?void?run()?{?? ????????int?messageNo?=?1;?? ????????while?(true)?? ????????{?? ????????????String?messageStr?=?new?String("Message_"?+?messageNo);?? ????????????System.out.println("Send:"?+?messageStr);?? ????????????producer.send(new?KeyedMessage<Integer,?String>(topic,?messageStr));?? ????????????messageNo++;?? ????????????try?{?? ????????????????sleep(3000);?? ????????????}?catch?(InterruptedException?e)?{?? ????????????????//?TODO?Auto-generated?catch?block?? ????????????????e.printStackTrace();?? ????????????}?? ????????}?? ????}?? ?? }??
consumer
[java]?view plaincopy package?com.sohu.kafkademon;?? ?? import?java.util.HashMap;?? import?java.util.List;?? import?java.util.Map;?? import?java.util.Properties;?? ?? import?kafka.consumer.ConsumerConfig;?? import?kafka.consumer.ConsumerIterator;?? import?kafka.consumer.KafkaStream;?? import?kafka.javaapi.consumer.ConsumerConnector;?? ?? /**? ?*?@author?leicui?bourne_cui@163.com? ?*/?? public?class?KafkaConsumer?extends?Thread?? {?? ????private?final?ConsumerConnector?consumer;?? ????private?final?String?topic;?? ?? ????public?KafkaConsumer(String?topic)?? ????{?? ????????consumer?=?kafka.consumer.Consumer.createJavaConsumerConnector(?? ????????????????createConsumerConfig());?? ????????this.topic?=?topic;?? ????}?? ?? ????private?static?ConsumerConfig?createConsumerConfig()?? ????{?? ????????Properties?props?=?new?Properties();?? ????????props.put("zookeeper.connect",?KafkaProperties.zkConnect);?? ????????props.put("group.id",?KafkaProperties.groupId);?? ????????props.put("zookeeper.session.timeout.ms",?"40000");?? ????????props.put("zookeeper.sync.time.ms",?"200");?? ????????props.put("auto.commit.interval.ms",?"1000");?? ????????return?new?ConsumerConfig(props);?? ????}?? ?? ????@Override?? ????public?void?run()?{?? ????????Map<String,?Integer>?topicCountMap?=?new?HashMap<String,?Integer>();?? ????????topicCountMap.put(topic,?new?Integer(1));?? ????????Map<String,?List<KafkaStream<byte[],?byte[]>>>?consumerMap?=?consumer.createMessageStreams(topicCountMap);?? ????????KafkaStream<byte[],?byte[]>?stream?=?consumerMap.get(topic).get(0);?? ????????ConsumerIterator<byte[],?byte[]>?it?=?stream.iterator();?? ????????while?(it.hasNext())?{?? ????????????System.out.println("receive:"?+?new?String(it.next().message()));?? ????????????try?{?? ????????????????sleep(3000);?? ????????????}?catch?(InterruptedException?e)?{?? ????????????????e.printStackTrace();?? ????????????}?? ????????}?? ????}?? }??
簡單的發(fā)送接收
運行下面這個程序,就可以進行簡單的發(fā)送接收消息了: [java]?view plaincopy package?com.sohu.kafkademon;?? ?? /**? ?*?@author?leicui?bourne_cui@163.com? ?*/?? public?class?KafkaConsumerProducerDemo?? {?? ????public?static?void?main(String[]?args)?? ????{?? ????????KafkaProducer?producerThread?=?new?KafkaProducer(KafkaProperties.topic);?? ????????producerThread.start();?? ?? ????????KafkaConsumer?consumerThread?=?new?KafkaConsumer(KafkaProperties.topic);?? ????????consumerThread.start();?? ????}?? }??
高級別的consumer
下面是比較負載的發(fā)送接收的程序: [java]?view plaincopy package?com.sohu.kafkademon;?? ?? import?java.util.HashMap;?? import?java.util.List;?? import?java.util.Map;?? import?java.util.Properties;?? ?? import?kafka.consumer.ConsumerConfig;?? import?kafka.consumer.ConsumerIterator;?? import?kafka.consumer.KafkaStream;?? import?kafka.javaapi.consumer.ConsumerConnector;?? ?? /**? ?*?@author?leicui?bourne_cui@163.com? ?*/?? public?class?KafkaConsumer?extends?Thread?? {?? ????private?final?ConsumerConnector?consumer;?? ????private?final?String?topic;?? ?? ????public?KafkaConsumer(String?topic)?? ????{?? ????????consumer?=?kafka.consumer.Consumer.createJavaConsumerConnector(?? ????????????????createConsumerConfig());?? ????????this.topic?=?topic;?? ????}?? ?? ????private?static?ConsumerConfig?createConsumerConfig()?? ????{?? ????????Properties?props?=?new?Properties();?? ????????props.put("zookeeper.connect",?KafkaProperties.zkConnect);?? ????????props.put("group.id",?KafkaProperties.groupId);?? ????????props.put("zookeeper.session.timeout.ms",?"40000");?? ????????props.put("zookeeper.sync.time.ms",?"200");?? ????????props.put("auto.commit.interval.ms",?"1000");?? ????????return?new?ConsumerConfig(props);?? ????}?? ?? ????@Override?? ????public?void?run()?{?? ????????Map<String,?Integer>?topicCountMap?=?new?HashMap<String,?Integer>();?? ????????topicCountMap.put(topic,?new?Integer(1));?? ????????Map<String,?List<KafkaStream<byte[],?byte[]>>>?consumerMap?=?consumer.createMessageStreams(topicCountMap);?? ????????KafkaStream<byte[],?byte[]>?stream?=?consumerMap.get(topic).get(0);?? ????????ConsumerIterator<byte[],?byte[]>?it?=?stream.iterator();?? ????????while?(it.hasNext())?{?? ????????????System.out.println("receive:"?+?new?String(it.next().message()));?? ????????????try?{?? ????????????????sleep(3000);?? ????????????}?catch?(InterruptedException?e)?{?? ????????????????e.printStackTrace();?? ????????????}?? ????????}?? ????}?? }??
Kafka視頻教程同步首發(fā),歡迎觀看!
上篇文章中我們搭建了kafka的服務器,并可以使用Kafka的命令行工具創(chuàng)建topic,發(fā)送和接收消息。下面我們來搭建kafka的開發(fā)環(huán)境。
添加依賴
搭建開發(fā)環(huán)境需要引入kafka的jar包,一種方式是將Kafka安裝包中l(wèi)ib下的jar包加入到項目的classpath中,這種比較簡單了。不過我們使用另一種更加流行的方式:使用maven管理jar包依賴。 創(chuàng)建好maven項目后,在pom.xml中添加以下依賴:
[html]?view plaincopy
添加依賴后如果有兩個jar包的依賴找不到。點擊這里下載這兩個jar包,解壓后你有兩種選擇,第一種是使用mvn的install命令將jar包安裝到本地倉庫,另一種是直接將解壓后的文件夾拷貝到mvn本地倉庫的com文件夾下,比如我的本地倉庫是d:\mvn,完成后我的目錄結構是這樣的:
配置程序
更新更全的API編程實例點這里:http://blog.csdn.net/honglei915/article/details/37697655
首先是一個充當配置文件作用的接口,配置了Kafka的各種連接參數: [java]?view plaincopy
producer
[java]?view plaincopy
consumer
[java]?view plaincopy
簡單的發(fā)送接收
運行下面這個程序,就可以進行簡單的發(fā)送接收消息了: [java]?view plaincopy
高級別的consumer
下面是比較負載的發(fā)送接收的程序: [java]?view plaincopy
總結
以上是生活随笔為你收集整理的漫游kafka实战篇之搭建Kafka开发环境的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 漫游Kafka实战篇之搭建Kafka运行
- 下一篇: 漫游Kafka设计篇之数据持久化