日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Kafka学习笔记-Java简单操作

發布時間:2023/12/15 java 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka学习笔记-Java简单操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Maven依賴包:

?

[plain]?view plaincopy
  • <dependency>??
  • ????????<groupId>org.apache.kafka</groupId>??
  • ????????<artifactId>kafka-clients</artifactId>??
  • ????????<version>0.8.2.1</version>??
  • </dependency>??
  • ??????????
  • <dependency>??
  • ????<groupId>org.apache.kafka</groupId>??
  • ????<artifactId>kafka_2.11</artifactId>??
  • ????<version>0.8.2.1</version>??
  • </dependency>??

  • 代碼如下:

    ?

    ?

    [java]?view plaincopy
  • import?java.util.Properties;??
  • ??
  • import?org.apache.kafka.clients.producer.Callback;??
  • import?org.apache.kafka.clients.producer.KafkaProducer;??
  • import?org.apache.kafka.clients.producer.ProducerRecord;??
  • import?org.apache.kafka.clients.producer.RecordMetadata;??
  • import?org.slf4j.Logger;??
  • import?org.slf4j.LoggerFactory;??
  • ??
  • public?class?KafkaProducerTest?{??
  • ??????
  • ????private?static?final?Logger?LOG?=?LoggerFactory.getLogger(KafkaProducerTest.class);??
  • ??????
  • ????private?static?Properties?properties?=?null;??
  • ??????
  • ????static?{??
  • ????????properties?=?new?Properties();??
  • ????????properties.put("bootstrap.servers",?"centos.master:9092,centos.slave1:9092,centos.slave2:9092");??
  • ????????properties.put("producer.type",?"sync");??
  • ????????properties.put("request.required.acks",?"1");??
  • ????????properties.put("serializer.class",?"kafka.serializer.DefaultEncoder");??
  • ????????properties.put("partitioner.class",?"kafka.producer.DefaultPartitioner");??
  • ????????properties.put("key.serializer",?"org.apache.kafka.common.serialization.ByteArraySerializer");??
  • //??????properties.put("key.serializer",?"org.apache.kafka.common.serialization.StringSerializer");??
  • ????????properties.put("value.serializer",?"org.apache.kafka.common.serialization.ByteArraySerializer");??
  • //??????properties.put("value.serializer",?"org.apache.kafka.common.serialization.StringSerializer");??
  • ????}??
  • ??????
  • ????public?void?produce()?{??
  • ????????KafkaProducer<byte[],?byte[]>?kafkaProducer?=?new?KafkaProducer<byte[],byte[]>(properties);??
  • ????????ProducerRecord<byte[],byte[]>?kafkaRecord?=?new?ProducerRecord<byte[],byte[]>(??
  • ????????????????"test",?"kkk".getBytes(),?"vvv".getBytes());??
  • ????????kafkaProducer.send(kafkaRecord,?new?Callback()?{??
  • ????????????public?void?onCompletion(RecordMetadata?metadata,?Exception?e)?{??
  • ????????????????if(null?!=?e)?{??
  • ????????????????????LOG.info("the?offset?of?the?send?record?is?{}",?metadata.offset());??
  • ????????????????????LOG.error(e.getMessage(),?e);??
  • ????????????????}??
  • ????????????????LOG.info("complete!");??
  • ????????????}??
  • ????????});??
  • ????????kafkaProducer.close();??
  • ????}??
  • ??
  • ????public?static?void?main(String[]?args)?{??
  • ????????KafkaProducerTest?kafkaProducerTest?=?new?KafkaProducerTest();??
  • ????????for?(int?i?=?0;?i?<?10;?i++)?{??
  • ????????????kafkaProducerTest.produce();??
  • ????????}??
  • ????}??
  • }??
  • ?

    [java]?view plaincopy
  • import?java.util.List;??
  • import?java.util.Map;??
  • import?java.util.Properties;??
  • ??
  • import?org.apache.kafka.clients.consumer.ConsumerConfig;??
  • import?org.apache.kafka.clients.consumer.ConsumerRecord;??
  • import?org.apache.kafka.clients.consumer.ConsumerRecords;??
  • import?org.apache.kafka.clients.consumer.KafkaConsumer;??
  • import?org.slf4j.Logger;??
  • import?org.slf4j.LoggerFactory;??
  • ??
  • public?class?KafkaConsumerTest?{??
  • ??????
  • ????private?static?final?Logger?LOG?=?LoggerFactory.getLogger(KafkaConsumerTest.class);??
  • ??????
  • ????public?static?void?main(String[]?args)?{??
  • ????????Properties?properties?=?new?Properties();??
  • ????????properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,???
  • ????????????????"centos.master:9092,centos.slave1:9092,centos.slave2:9092");??
  • ????????properties.put(ConsumerConfig.GROUP_ID_CONFIG,?"test-consumer-group");??????????????
  • ????????properties.put(ConsumerConfig.SESSION_TIMEOUT_MS,?"1000");??????????????
  • ????????properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,?"true");??
  • ????????properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY,?"range");??
  • //??????properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY,?"roundrobin");??
  • ????????properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,?"10000");????
  • ????????properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,???
  • ????????????????"org.apache.kafka.common.serialization.ByteArrayDeserializer");??
  • ????????properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,???
  • ????????????????"org.apache.kafka.common.serialization.ByteArrayDeserializer");??
  • ??????????
  • ????????KafkaConsumer<byte[],?byte[]>?kafkaConsumer?=?new?KafkaConsumer<byte[],?byte[]>(properties);??
  • ????????kafkaConsumer.subscribe("test");??
  • //??????kafkaConsumer.subscribe("*");??
  • ????????boolean?isRunning?=?true;??????????????
  • ????????while(isRunning)?{??
  • ????????????Map<String,?ConsumerRecords<byte[],?byte[]>>?results?=?kafkaConsumer.poll(100);??
  • ????????????if?(null?!=?results)?{??
  • ????????????????for?(Map.Entry<String,?ConsumerRecords<byte[],?byte[]>>?entry?:?results.entrySet())?{??
  • ????????????????????LOG.info("topic?{}",?entry.getKey());??
  • ????????????????????ConsumerRecords<byte[],?byte[]>?consumerRecords?=?entry.getValue();??
  • ????????????????????List<ConsumerRecord<byte[],?byte[]>>?records?=?consumerRecords.records();??
  • ????????????????????for?(int?i?=?0,?len?=?records.size();?i?<?len;?i++)?{??
  • ????????????????????????ConsumerRecord<byte[],?byte[]>?consumerRecord?=?records.get(i);??
  • ????????????????????????LOG.info("topic?{}?partition?{}",?consumerRecord.topic(),?consumerRecord.partition());??
  • ????????????????????????try?{??
  • ????????????????????????????LOG.info("offset?{}?value?{}",?consumerRecord.offset(),?new?String(consumerRecord.value()));??
  • ????????????????????????}?catch?(Exception?e)?{??
  • ????????????????????????????LOG.error(e.getMessage(),?e);??
  • ????????????????????????}??
  • ????????????????????}??
  • ????????????????}??
  • ????????????}??
  • ????????}??
  • ??????????
  • ????????kafkaConsumer.close();????
  • ??????????
  • ????}??
  • ??
  • }??
  • ?

    發現KafkaConsumer的poll方法未實現

    ?

    [java]?view plaincopy
  • @Override??
  • public?Map<String,?ConsumerRecords<K,V>>?poll(long?timeout)?{??
  • ?????//?TODO?Auto-generated?method?stub??
  • ?????return?null;??
  • }??

  • 后改為kafka.javaapi.consumer.SimpleConsumer實現,正常運行

    ?

    [java]?view plaincopy
  • import?java.nio.ByteBuffer;??
  • import?java.util.ArrayList;??
  • import?java.util.Collections;??
  • import?java.util.HashMap;??
  • import?java.util.List;??
  • import?java.util.Map;??
  • ??
  • import?kafka.api.FetchRequest;??
  • import?kafka.api.FetchRequestBuilder;??
  • import?kafka.api.PartitionOffsetRequestInfo;??
  • import?kafka.cluster.Broker;??
  • import?kafka.common.ErrorMapping;??
  • import?kafka.common.TopicAndPartition;??
  • import?kafka.javaapi.FetchResponse;??
  • import?kafka.javaapi.OffsetRequest;??
  • import?kafka.javaapi.OffsetResponse;??
  • import?kafka.javaapi.PartitionMetadata;??
  • import?kafka.javaapi.TopicMetadata;??
  • import?kafka.javaapi.TopicMetadataRequest;??
  • import?kafka.javaapi.TopicMetadataResponse;??
  • import?kafka.javaapi.consumer.SimpleConsumer;??
  • import?kafka.message.MessageAndOffset;??
  • ??
  • public?class?KafkaSimpleConsumerTest?{??
  • ??????
  • ????private?List<String>?borkerList?=?new?ArrayList<String>();????
  • ????????
  • ????public?KafkaSimpleConsumerTest()?{????
  • ????????borkerList?=?new?ArrayList<String>();????
  • ????}????
  • ????
  • ????public?static?void?main(String?args[])?{????
  • ????????KafkaSimpleConsumerTest?kafkaSimpleConsumer?=?new?KafkaSimpleConsumerTest();????
  • ????????//?最大讀取消息數量????
  • ????????long?maxReadNum?=?Long.parseLong("3");????
  • ????????//?訂閱的topic????
  • ????????String?topic?=?"test";????
  • ????????//?查找的分區????
  • ????????int?partition?=?Integer.parseInt("0");????
  • ????????//?broker節點??
  • ????????List<String>?seeds?=?new?ArrayList<String>();????
  • ????????seeds.add("centos.master");????
  • ????????seeds.add("centos.slave1");????
  • ????????seeds.add("centos.slave2");????
  • ????????//?端口????
  • ????????int?port?=?Integer.parseInt("9092");????
  • ????????try?{????
  • ????????????kafkaSimpleConsumer.run(maxReadNum,?topic,?partition,?seeds,?port);????
  • ????????}?catch?(Exception?e)?{????
  • ????????????System.out.println("Oops:"?+?e);????
  • ????????????e.printStackTrace();????
  • ????????}????
  • ????}????
  • ????
  • ????public?void?run(long?maxReadNum,?String?topic,?int?partition,?List<String>?seedBrokers,?int?port)?throws?Exception?{????
  • ????????//?獲取指定topic?partition的元數據????
  • ????????PartitionMetadata?metadata?=?findLeader(seedBrokers,?port,?topic,?partition);????
  • ????????if?(metadata?==?null)?{????
  • ????????????System.out.println("can't?find?metadata?for?topic?and?partition.?exit");????
  • ????????????return;????
  • ????????}????
  • ????????if?(metadata.leader()?==?null)?{????
  • ????????????System.out.println("can't?find?leader?for?topic?and?partition.?exit");????
  • ????????????return;????
  • ????????}????
  • ????????String?leadBroker?=?metadata.leader().host();????
  • ????????String?clientName?=?"client_"?+?topic?+?"_"?+?partition;????
  • ????
  • ????????SimpleConsumer?consumer?=?new?SimpleConsumer(leadBroker,?port,?100000,?64?*?1024,?clientName);????
  • ????????long?readOffset?=?getLastOffset(consumer,?topic,?partition,?kafka.api.OffsetRequest.EarliestTime(),?clientName);????
  • ????????int?numErrors?=?0;????
  • ????????while?(maxReadNum?>?0)?{????
  • ????????????if?(consumer?==?null)?{????
  • ????????????????consumer?=?new?SimpleConsumer(leadBroker,?port,?100000,?64?*?1024,?clientName);????
  • ????????????}????
  • ????????????FetchRequest?req?=?new?FetchRequestBuilder().clientId(clientName).addFetch(topic,?partition,?readOffset,?100000).build();????
  • ????????????FetchResponse?fetchResponse?=?consumer.fetch(req);????
  • ????
  • ????????????if?(fetchResponse.hasError())?{????
  • ????????????????numErrors++;????
  • ????????????????short?code?=?fetchResponse.errorCode(topic,?partition);????
  • ????????????????System.out.println("error?fetching?data?from?the?broker:"?+?leadBroker?+?"?reason:?"?+?code);????
  • ????????????????if?(numErrors?>?5)????
  • ????????????????????break;????
  • ????????????????if?(code?==?ErrorMapping.OffsetOutOfRangeCode())?{????
  • ????????????????????readOffset?=?getLastOffset(consumer,?topic,?partition,?kafka.api.OffsetRequest.LatestTime(),?clientName);????
  • ????????????????????continue;????
  • ????????????????}????
  • ????????????????consumer.close();????
  • ????????????????consumer?=?null;????
  • ????????????????leadBroker?=?findNewLeader(leadBroker,?topic,?partition,?port);????
  • ????????????????continue;????
  • ????????????}????
  • ????????????numErrors?=?0;????
  • ????
  • ????????????long?numRead?=?0;????
  • ????????????for?(MessageAndOffset?messageAndOffset?:?fetchResponse.messageSet(topic,?partition))?{????
  • ????????????????long?currentOffset?=?messageAndOffset.offset();????
  • ????????????????if?(currentOffset?<?readOffset)?{????
  • ????????????????????System.out.println("found?an?old?offset:?"?+?currentOffset?+?"?expecting:?"?+?readOffset);????
  • ????????????????????continue;????
  • ????????????????}????
  • ????
  • ????????????????readOffset?=?messageAndOffset.nextOffset();????
  • ????????????????ByteBuffer?payload?=?messageAndOffset.message().payload();????
  • ????
  • ????????????????byte[]?bytes?=?new?byte[payload.limit()];????
  • ????????????????payload.get(bytes);????
  • ????????????????System.out.println(String.valueOf(messageAndOffset.offset())?+?":?"?+?new?String(bytes,?"UTF-8"));????
  • ????????????????numRead++;????
  • ????????????????maxReadNum--;????
  • ????????????}????
  • ????
  • ????????????if?(numRead?==?0)?{????
  • ????????????????try?{????
  • ????????????????????Thread.sleep(1000);????
  • ????????????????}?catch?(InterruptedException?ie)?{????
  • ????????????????}????
  • ????????????}????
  • ????????}????
  • ????????if?(consumer?!=?null)????
  • ????????????consumer.close();????
  • ????}????
  • ?????
  • ????/**?
  • ?????*?從活躍的Broker列表中找出指定Topic、Partition中的Leader?Broker?
  • ?????*?@param?seedBrokers?
  • ?????*?@param?port?
  • ?????*?@param?topic?
  • ?????*?@param?partition?
  • ?????*?@return?
  • ?????*/??
  • ????private?PartitionMetadata?findLeader(List<String>?seedBrokers,?int?port,?String?topic,?int?partition)?{????
  • ????????PartitionMetadata?partitionMetadata?=?null;????
  • ????????loop:?for?(String?seedBroker?:?seedBrokers)?{????
  • ????????????SimpleConsumer?consumer?=?null;????
  • ????????????try?{????
  • ????????????????consumer?=?new?SimpleConsumer(seedBroker,?port,?100000,?64?*?1024,?"leaderLookup");????
  • ????????????????List<String>?topics?=?Collections.singletonList(topic);????
  • ????????????????TopicMetadataRequest?topicMetadataRequest?=?new?TopicMetadataRequest(topics);????
  • ????????????????TopicMetadataResponse?topicMetadataResponse?=?consumer.send(topicMetadataRequest);????
  • ????
  • ????????????????List<TopicMetadata>?topicMetadatas?=?topicMetadataResponse.topicsMetadata();????
  • ????????????????for?(TopicMetadata?topicMetadata?:?topicMetadatas)?{????
  • ????????????????????for?(PartitionMetadata?pMetadata?:?topicMetadata.partitionsMetadata())?{????
  • ????????????????????????if?(pMetadata.partitionId()?==?partition)?{????
  • ????????????????????????????partitionMetadata?=?pMetadata;????
  • ????????????????????????????break?loop;????
  • ????????????????????????}????
  • ????????????????????}????
  • ????????????????}????
  • ????????????}?catch?(Exception?e)?{????
  • ????????????????System.out.println("error?communicating?with?broker?["?+?seedBroker?+?"]?to?find?leader?for?["?+?topic?+?",?"?+?partition?+?"]?reason:?"?+?e);????
  • ????????????}?finally?{????
  • ????????????????if?(consumer?!=?null)????
  • ????????????????????consumer.close();????
  • ????????????}????
  • ????????}????
  • ????????if?(partitionMetadata?!=?null)?{????
  • ????????????borkerList.clear();????
  • ????????????for?(Broker?replica?:?partitionMetadata.replicas())?{????
  • ????????????????borkerList.add(replica.host());????
  • ????????????}????
  • ????????}????
  • ????????return?partitionMetadata;????
  • ????}????
  • ????
  • ????public?static?long?getLastOffset(SimpleConsumer?consumer,?String?topic,?int?partition,?long?whichTime,?String?clientName)?{????
  • ????????TopicAndPartition?topicAndPartition?=?new?TopicAndPartition(topic,?partition);????
  • ????????Map<TopicAndPartition,?PartitionOffsetRequestInfo>?requestInfo?=?new?HashMap<TopicAndPartition,?PartitionOffsetRequestInfo>();????
  • ????????requestInfo.put(topicAndPartition,?new?PartitionOffsetRequestInfo(whichTime,?1));????
  • ????????OffsetRequest?request?=?new?OffsetRequest(requestInfo,?kafka.api.OffsetRequest.CurrentVersion(),?clientName);????
  • ????????OffsetResponse?response?=?consumer.getOffsetsBefore(request);????
  • ????????if?(response.hasError())?{????
  • ????????????System.out.println("error?fetching?data?offset?data?the?broker.?reason:?"?+?response.errorCode(topic,?partition));????
  • ????????????return?0;????
  • ????????}????
  • ????????long[]?offsets?=?response.offsets(topic,?partition);????
  • ????????return?offsets[0];????
  • ????}????
  • ????
  • ????private?String?findNewLeader(String?oldLeader,?String?topic,?int?partition,?int?port)?throws?Exception?{????
  • ????????for?(int?i?=?0;?i?<?3;?i++)?{????
  • ????????????boolean?goToSleep?=?false;????
  • ????????????PartitionMetadata?metadata?=?findLeader(borkerList,?port,?topic,?partition);????
  • ????????????if?(metadata?==?null)?{????
  • ????????????????goToSleep?=?true;????
  • ????????????}?else?if?(metadata.leader()?==?null)?{????
  • ????????????????goToSleep?=?true;????
  • ????????????}?else?if?(oldLeader.equalsIgnoreCase(metadata.leader().host())?&&?i?==?0)?{????
  • ????????????????goToSleep?=?true;????
  • ????????????}?else?{????
  • ????????????????return?metadata.leader().host();????
  • ????????????}????
  • ????????????if?(goToSleep)?{????
  • ????????????????try?{????
  • ????????????????????Thread.sleep(1000);????
  • ????????????????}?catch?(InterruptedException?ie)?{????
  • ????????????????}????
  • ????????????}????
  • ????????}????
  • ????????System.out.println("unable?to?find?new?leader?after?broker?failure.?exit");????
  • ????????throw?new?Exception("unable?to?find?new?leader?after?broker?failure.?exit");????
  • ????}????
  • ????
  • }????
  • 轉載于:https://www.cnblogs.com/edison2012/p/5759223.html

    創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的Kafka学习笔记-Java简单操作的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。