生活随笔
收集整理的這篇文章主要介紹了
Kafka学习笔记-Java简单操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Maven依賴包:
?
[plain]?view plaincopy
<dependency>??????????<groupId>org.apache.kafka</groupId>??????????<artifactId>kafka-clients</artifactId>??????????<version>0.8.2.1</version>??</dependency>????????????<dependency>??????<groupId>org.apache.kafka</groupId>??????<artifactId>kafka_2.11</artifactId>??????<version>0.8.2.1</version>??</dependency>??
代碼如下:
?
?
[java]?view plaincopy
import?java.util.Properties;????import?org.apache.kafka.clients.producer.Callback;??import?org.apache.kafka.clients.producer.KafkaProducer;??import?org.apache.kafka.clients.producer.ProducerRecord;??import?org.apache.kafka.clients.producer.RecordMetadata;??import?org.slf4j.Logger;??import?org.slf4j.LoggerFactory;????public?class?KafkaProducerTest?{????????????private?static?final?Logger?LOG?=?LoggerFactory.getLogger(KafkaProducerTest.class);????????????private?static?Properties?properties?=?null;????????????static?{??????????properties?=?new?Properties();??????????properties.put("bootstrap.servers",?"centos.master:9092,centos.slave1:9092,centos.slave2:9092");??????????properties.put("producer.type",?"sync");??????????properties.put("request.required.acks",?"1");??????????properties.put("serializer.class",?"kafka.serializer.DefaultEncoder");??????????properties.put("partitioner.class",?"kafka.producer.DefaultPartitioner");??????????properties.put("key.serializer",?"org.apache.kafka.common.serialization.ByteArraySerializer");??????????properties.put("value.serializer",?"org.apache.kafka.common.serialization.ByteArraySerializer");??????}????????????public?void?produce()?{??????????KafkaProducer<byte[],?byte[]>?kafkaProducer?=?new?KafkaProducer<byte[],byte[]>(properties);??????????ProducerRecord<byte[],byte[]>?kafkaRecord?=?new?ProducerRecord<byte[],byte[]>(??????????????????"test",?"kkk".getBytes(),?"vvv".getBytes());??????????kafkaProducer.send(kafkaRecord,?new?Callback()?{??????????????public?void?onCompletion(RecordMetadata?metadata,?Exception?e)?{??????????????????if(null?!=?e)?{??????????????????????LOG.info("the?offset?of?the?send?record?is?{}",?metadata.offset());??????????????????????LOG.error(e.getMessage(),?e);??????????????????}??????????????????LOG.info("complete!");??????????????}??????????});??????????kafkaProducer.close();??????}????????public?static?void?main(String[]?args)?{??????????KafkaProducerTest?kafkaProducerTest?=?new?KafkaProducerTest();??????????for?(int?i?=?0;?i?<?10;?i++)?{??????????????kafkaProducerTest.produce();??????????}??????}??}?? ?
[java]?view plaincopy
import?java.util.List;??import?java.util.Map;??import?java.util.Properties;????import?org.apache.kafka.clients.consumer.ConsumerConfig;??import?org.apache.kafka.clients.consumer.ConsumerRecord;??import?org.apache.kafka.clients.consumer.ConsumerRecords;??import?org.apache.kafka.clients.consumer.KafkaConsumer;??import?org.slf4j.Logger;??import?org.slf4j.LoggerFactory;????public?class?KafkaConsumerTest?{????????????private?static?final?Logger?LOG?=?LoggerFactory.getLogger(KafkaConsumerTest.class);????????????public?static?void?main(String[]?args)?{??????????Properties?properties?=?new?Properties();??????????properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,???????????????????"centos.master:9092,centos.slave1:9092,centos.slave2:9092");??????????properties.put(ConsumerConfig.GROUP_ID_CONFIG,?"test-consumer-group");??????????????????????properties.put(ConsumerConfig.SESSION_TIMEOUT_MS,?"1000");??????????????????????properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,?"true");??????????properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY,?"range");??????????properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,?"10000");????????????properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,???????????????????"org.apache.kafka.common.serialization.ByteArrayDeserializer");??????????properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,???????????????????"org.apache.kafka.common.serialization.ByteArrayDeserializer");????????????????????KafkaConsumer<byte[],?byte[]>?kafkaConsumer?=?new?KafkaConsumer<byte[],?byte[]>(properties);??????????kafkaConsumer.subscribe("test");??????????boolean?isRunning?=?true;??????????????????????while(isRunning)?{??????????????Map<String,?ConsumerRecords<byte[],?byte[]>>?results?=?kafkaConsumer.poll(100);??????????????if?(null?!=?results)?{??????????????????for?(Map.Entry<String,?ConsumerRecords<byte[],?byte[]>>?entry?:?results.entrySet())?{??????????????????????LOG.info("topic?{}",?entry.getKey());??????????????????????ConsumerRecords<byte[],?byte[]>?consumerRecords?=?entry.getValue();??????????????????????List<ConsumerRecord<byte[],?byte[]>>?records?=?consumerRecords.records();??????????????????????for?(int?i?=?0,?len?=?records.size();?i?<?len;?i++)?{??????????????????????????ConsumerRecord<byte[],?byte[]>?consumerRecord?=?records.get(i);??????????????????????????LOG.info("topic?{}?partition?{}",?consumerRecord.topic(),?consumerRecord.partition());??????????????????????????try?{??????????????????????????????LOG.info("offset?{}?value?{}",?consumerRecord.offset(),?new?String(consumerRecord.value()));??????????????????????????}?catch?(Exception?e)?{??????????????????????????????LOG.error(e.getMessage(),?e);??????????????????????????}??????????????????????}??????????????????}??????????????}??????????}????????????????????kafkaConsumer.close();??????????????????}????}?? ?
發現KafkaConsumer的poll方法未實現
?
[java]?view plaincopy
@Override??public?Map<String,?ConsumerRecords<K,V>>?poll(long?timeout)?{????????????return?null;??}??
后改為kafka.javaapi.consumer.SimpleConsumer實現,正常運行
?
[java]?view plaincopy
import?java.nio.ByteBuffer;??import?java.util.ArrayList;??import?java.util.Collections;??import?java.util.HashMap;??import?java.util.List;??import?java.util.Map;????import?kafka.api.FetchRequest;??import?kafka.api.FetchRequestBuilder;??import?kafka.api.PartitionOffsetRequestInfo;??import?kafka.cluster.Broker;??import?kafka.common.ErrorMapping;??import?kafka.common.TopicAndPartition;??import?kafka.javaapi.FetchResponse;??import?kafka.javaapi.OffsetRequest;??import?kafka.javaapi.OffsetResponse;??import?kafka.javaapi.PartitionMetadata;??import?kafka.javaapi.TopicMetadata;??import?kafka.javaapi.TopicMetadataRequest;??import?kafka.javaapi.TopicMetadataResponse;??import?kafka.javaapi.consumer.SimpleConsumer;??import?kafka.message.MessageAndOffset;????public?class?KafkaSimpleConsumerTest?{????????????private?List<String>?borkerList?=?new?ArrayList<String>();????????????????public?KafkaSimpleConsumerTest()?{????????????borkerList?=?new?ArrayList<String>();????????}????????????public?static?void?main(String?args[])?{????????????KafkaSimpleConsumerTest?kafkaSimpleConsumer?=?new?KafkaSimpleConsumerTest();????????????????????long?maxReadNum?=?Long.parseLong("3");????????????????????String?topic?=?"test";????????????????????int?partition?=?Integer.parseInt("0");????????????????????List<String>?seeds?=?new?ArrayList<String>();????????????seeds.add("centos.master");????????????seeds.add("centos.slave1");????????????seeds.add("centos.slave2");????????????????????int?port?=?Integer.parseInt("9092");????????????try?{????????????????kafkaSimpleConsumer.run(maxReadNum,?topic,?partition,?seeds,?port);????????????}?catch?(Exception?e)?{????????????????System.out.println("Oops:"?+?e);????????????????e.printStackTrace();????????????}????????}????????????public?void?run(long?maxReadNum,?String?topic,?int?partition,?List<String>?seedBrokers,?int?port)?throws?Exception?{????????????????????PartitionMetadata?metadata?=?findLeader(seedBrokers,?port,?topic,?partition);????????????if?(metadata?==?null)?{????????????????System.out.println("can't?find?metadata?for?topic?and?partition.?exit");????????????????return;????????????}????????????if?(metadata.leader()?==?null)?{????????????????System.out.println("can't?find?leader?for?topic?and?partition.?exit");????????????????return;????????????}????????????String?leadBroker?=?metadata.leader().host();????????????String?clientName?=?"client_"?+?topic?+?"_"?+?partition;????????????????SimpleConsumer?consumer?=?new?SimpleConsumer(leadBroker,?port,?100000,?64?*?1024,?clientName);????????????long?readOffset?=?getLastOffset(consumer,?topic,?partition,?kafka.api.OffsetRequest.EarliestTime(),?clientName);????????????int?numErrors?=?0;????????????while?(maxReadNum?>?0)?{????????????????if?(consumer?==?null)?{????????????????????consumer?=?new?SimpleConsumer(leadBroker,?port,?100000,?64?*?1024,?clientName);????????????????}????????????????FetchRequest?req?=?new?FetchRequestBuilder().clientId(clientName).addFetch(topic,?partition,?readOffset,?100000).build();????????????????FetchResponse?fetchResponse?=?consumer.fetch(req);????????????????????if?(fetchResponse.hasError())?{????????????????????numErrors++;????????????????????short?code?=?fetchResponse.errorCode(topic,?partition);????????????????????System.out.println("error?fetching?data?from?the?broker:"?+?leadBroker?+?"?reason:?"?+?code);????????????????????if?(numErrors?>?5)????????????????????????break;????????????????????if?(code?==?ErrorMapping.OffsetOutOfRangeCode())?{????????????????????????readOffset?=?getLastOffset(consumer,?topic,?partition,?kafka.api.OffsetRequest.LatestTime(),?clientName);????????????????????????continue;????????????????????}????????????????????consumer.close();????????????????????consumer?=?null;????????????????????leadBroker?=?findNewLeader(leadBroker,?topic,?partition,?port);????????????????????continue;????????????????}????????????????numErrors?=?0;????????????????????long?numRead?=?0;????????????????for?(MessageAndOffset?messageAndOffset?:?fetchResponse.messageSet(topic,?partition))?{????????????????????long?currentOffset?=?messageAndOffset.offset();????????????????????if?(currentOffset?<?readOffset)?{????????????????????????System.out.println("found?an?old?offset:?"?+?currentOffset?+?"?expecting:?"?+?readOffset);????????????????????????continue;????????????????????}????????????????????????readOffset?=?messageAndOffset.nextOffset();????????????????????ByteBuffer?payload?=?messageAndOffset.message().payload();????????????????????????byte[]?bytes?=?new?byte[payload.limit()];????????????????????payload.get(bytes);????????????????????System.out.println(String.valueOf(messageAndOffset.offset())?+?":?"?+?new?String(bytes,?"UTF-8"));????????????????????numRead++;????????????????????maxReadNum--;????????????????}????????????????????if?(numRead?==?0)?{????????????????????try?{????????????????????????Thread.sleep(1000);????????????????????}?catch?(InterruptedException?ie)?{????????????????????}????????????????}????????????}????????????if?(consumer?!=?null)????????????????consumer.close();????????}?????????????????private?PartitionMetadata?findLeader(List<String>?seedBrokers,?int?port,?String?topic,?int?partition)?{????????????PartitionMetadata?partitionMetadata?=?null;????????????loop:?for?(String?seedBroker?:?seedBrokers)?{????????????????SimpleConsumer?consumer?=?null;????????????????try?{????????????????????consumer?=?new?SimpleConsumer(seedBroker,?port,?100000,?64?*?1024,?"leaderLookup");????????????????????List<String>?topics?=?Collections.singletonList(topic);????????????????????TopicMetadataRequest?topicMetadataRequest?=?new?TopicMetadataRequest(topics);????????????????????TopicMetadataResponse?topicMetadataResponse?=?consumer.send(topicMetadataRequest);????????????????????????List<TopicMetadata>?topicMetadatas?=?topicMetadataResponse.topicsMetadata();????????????????????for?(TopicMetadata?topicMetadata?:?topicMetadatas)?{????????????????????????for?(PartitionMetadata?pMetadata?:?topicMetadata.partitionsMetadata())?{????????????????????????????if?(pMetadata.partitionId()?==?partition)?{????????????????????????????????partitionMetadata?=?pMetadata;????????????????????????????????break?loop;????????????????????????????}????????????????????????}????????????????????}????????????????}?catch?(Exception?e)?{????????????????????System.out.println("error?communicating?with?broker?["?+?seedBroker?+?"]?to?find?leader?for?["?+?topic?+?",?"?+?partition?+?"]?reason:?"?+?e);????????????????}?finally?{????????????????????if?(consumer?!=?null)????????????????????????consumer.close();????????????????}????????????}????????????if?(partitionMetadata?!=?null)?{????????????????borkerList.clear();????????????????for?(Broker?replica?:?partitionMetadata.replicas())?{????????????????????borkerList.add(replica.host());????????????????}????????????}????????????return?partitionMetadata;????????}????????????public?static?long?getLastOffset(SimpleConsumer?consumer,?String?topic,?int?partition,?long?whichTime,?String?clientName)?{????????????TopicAndPartition?topicAndPartition?=?new?TopicAndPartition(topic,?partition);????????????Map<TopicAndPartition,?PartitionOffsetRequestInfo>?requestInfo?=?new?HashMap<TopicAndPartition,?PartitionOffsetRequestInfo>();????????????requestInfo.put(topicAndPartition,?new?PartitionOffsetRequestInfo(whichTime,?1));????????????OffsetRequest?request?=?new?OffsetRequest(requestInfo,?kafka.api.OffsetRequest.CurrentVersion(),?clientName);????????????OffsetResponse?response?=?consumer.getOffsetsBefore(request);????????????if?(response.hasError())?{????????????????System.out.println("error?fetching?data?offset?data?the?broker.?reason:?"?+?response.errorCode(topic,?partition));????????????????return?0;????????????}????????????long[]?offsets?=?response.offsets(topic,?partition);????????????return?offsets[0];????????}????????????private?String?findNewLeader(String?oldLeader,?String?topic,?int?partition,?int?port)?throws?Exception?{????????????for?(int?i?=?0;?i?<?3;?i++)?{????????????????boolean?goToSleep?=?false;????????????????PartitionMetadata?metadata?=?findLeader(borkerList,?port,?topic,?partition);????????????????if?(metadata?==?null)?{????????????????????goToSleep?=?true;????????????????}?else?if?(metadata.leader()?==?null)?{????????????????????goToSleep?=?true;????????????????}?else?if?(oldLeader.equalsIgnoreCase(metadata.leader().host())?&&?i?==?0)?{????????????????????goToSleep?=?true;????????????????}?else?{????????????????????return?metadata.leader().host();????????????????}????????????????if?(goToSleep)?{????????????????????try?{????????????????????????Thread.sleep(1000);????????????????????}?catch?(InterruptedException?ie)?{????????????????????}????????????????}????????????}????????????System.out.println("unable?to?find?new?leader?after?broker?failure.?exit");????????????throw?new?Exception("unable?to?find?new?leader?after?broker?failure.?exit");????????}????????}????
轉載于:https://www.cnblogs.com/edison2012/p/5759223.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔為你收集整理的Kafka学习笔记-Java简单操作的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。