日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka的入门级API应用

發布時間:2023/12/20 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka的入门级API应用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 1.Producer API
    • 1.1 消息發送流程
    • 1.2 異步發送API
      • 1.2.1 導入依賴
      • 1.2.2 添加log4j配置文件
      • 1.2.3 編寫代碼
        • 1.2.3.1 不帶回調函數的API
        • 1.2.3.2 帶回調函數的API
    • 1.3 自定義分區器
  • 2.Consumer API
    • 2.1 自動提交offset
      • 2.1.1 編寫代碼
    • 2.2 手動提交offset
      • 2.2.1 同步提交offset
      • 2.2.2 異步提交offset
    • 2.3 數據漏消費和重復消費分析
  • 3.自定義Interceptor
    • 3.1 攔截器原理
    • 3.2 攔截器案例
      • 3.2.1 需求
      • 3.2.2 增加時間戳攔截器
      • 3.2.3 計數攔截器
      • 3.2.4 Producer主程序
      • 3.2.5 測試

1.Producer API

1.1 消息發送流程

? Kafka的Producer發送消息采用異步發送的方式。在消息發送的過程中,涉及到了兩個線程—-main線程和Sender線程,以及一個線程共享變量—-RecordAccumulator。main線程將消息發送給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發送到Kafka broker。

? 下圖為KafkaProducer發送消息流程:

注意:

兩個重要參數: 1.batch.size:sender發送數據的前提是數據積累到batch.size(一批數據的大小)。 2.lingertime:如果數據遲遲未達到batch.size,sender等待lingertime之后就會發送數據。

1.2 異步發送API

1.2.1 導入依賴

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.12.0</version></dependency> </dependencies>

1.2.2 添加log4j配置文件

<?xml version="1.0" encoding="UTF-8"?> <Configuration status="error" strict="true" name="XMLConfig"><Appenders><!-- 類型名為Console,名稱為必須屬性 --><Appender type="Console" name="STDOUT"><!-- 布局為PatternLayout的方式,輸出樣式為[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here --><Layout type="PatternLayout"pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" /></Appender></Appenders><Loggers><!-- 可加性為false --><Logger name="test" level="info" additivity="false"><AppenderRef ref="STDOUT" /></Logger><!-- root loggerConfig設置 --><Root level="info"><AppenderRef ref="STDOUT" /></Root></Loggers></Configuration>

1.2.3 編寫代碼

需要用到的類:

KafkaProducer:創建一個生產者對象,用來發送數據

ProducerConfig:獲取所需的一系列配置參數

ProducerRecord:每條數據都要封裝成一個ProducerRecord對象

1.2.3.1 不帶回調函數的API

CustomProducer.java

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:*/ public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();//kafka集群:broker-listprops.put("bootstrap.servers","yxp:9092");//重試次數props.put("retries",1);//批次大小props.put("batch.size",16384);//等待時間props.put("linger.ms",1);//RecordAccumilator緩沖區大小props.put("buffer.memory",33554432);//鍵和值的序列化器props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);//發送的消息內容為:[0,20)for (int i=0;i<20;i++){producer.send(new ProducerRecord<String,String>("topic_jx",String.valueOf(i),String.valueOf(i)));}producer.close();} }

測試:

1.在shell中先開啟consumer端:

如果沒有該Topic,會警告然后自動創建,不用擔心。

命令:

kafka-console-consumer.sh --bootstrap-server yxp:9092 --topic topic_jx

2.運行ConusmerProducer代碼

3.查看Consumer收到的消息

1.2.3.2 帶回調函數的API

回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數,分別是RecordMetadataException,如果Exception為null,說明消息發送成功,如果Exception不為null,說明消息發送失敗。

注意:當消息發送失敗后會自動重試,不需要我們在回調函數中手動調試。

對原ConsumerProducer.java做一下修改,修改后的程序代碼為:

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:*/ public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();//kafka集群:broker-listprops.put("bootstrap.servers","yxp:9092");//重試次數props.put("retries",1);//批次大小props.put("batch.size",16384);//等待時間props.put("linger.ms",1);//RecordAccumilator緩沖區大小props.put("buffer.memory",33554432);//鍵和值的序列化器props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);//在每次發送的數據的send方法中調用回調函數for (int i=0;i<20;i++){producer.send(new ProducerRecord<String, String>("topic_jx", String.valueOf(i), String.valueOf(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception==null){System.out.println("success->"+metadata.offset());}else{exception.printStackTrace();}}});}producer.close();} }

發送消息后會調用該方法打印成功與否,控制臺消息如下:

這success后跟的是生產者的偏移量。由于已經發送過20條(0-19),因此從20開始

Consumer端收到的消息:


1.3 自定義分區器

? MyPartitioner.java

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster;import java.util.Map;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:*/ public class MyPartitionerx implements Partitioner {/***分區規則:value%2==0 放在0分區中,value%2!=0 放在1分區中* @param topic 主題* @param key 消息的key* @param keyBytes key序列化后的字節數組* @param value 消息的value* @param valueBytes value序列化后的字節數組* @param cluster 類似于MR中的context,有獲取各種參數的作用* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int partition=1;if (Integer.parseInt(value.toString())%2==0){System.out.println(Integer.parseInt(value.toString())%2);partition=0;}else {System.out.println(Integer.parseInt(value.toString())%2);}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {} }

在Producer對應的類中指定該程序全類名。

ConsumerProducer.java

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:*/ public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();//kafka集群:broker-listprops.put("bootstrap.servers","yxp:9092");//重試次數props.put("retries",1);//批次大小props.put("batch.size",16384);//等待時間props.put("linger.ms",1);//自定義分區器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitionerx.class.getName());//RecordAccumilator緩沖區大小props.put("buffer.memory",33554432);//鍵和值的序列化器props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (int i=0;i<40;i++){producer.send(new ProducerRecord<String, String>("topic_mem", String.valueOf(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception==null){System.out.println("success->"+metadata.offset());}else{exception.printStackTrace();}}});}producer.close();} }

2.Consumer API

? 由于數據在Kafka中是持久化的,所以不用擔心數據丟失問題,因此,Consumer消費數據時的可靠性是可以保證的。

? 由于consumer在消費過程中可能會出現斷電宕機等故障,consumer恢復后,需要從故障前的位置繼續消費,所以consumer需要實時記錄自己消費到了哪個offset,以便故障恢復后繼續消費。

所以offset的維護是Consumer消費數據是必須考慮的問題。


2.1 自動提交offset

2.1.1 編寫代碼

需要用到的類:

KafkaConsumer:創建一個消費者對象,用來消費數據。

ConsumerConfig:獲取所需的一系列配置參數。

ConsumerRecord:每條數據都要封裝成一個ConsumerRecord對象。

Kafka提供了自動提交offset的功能。自動提交Offset的相關參數:

enable.auto.commit:是否開啟自動提交offset功能。

auto.commit.interval.ms:自動提交offset的時間間隔。

CustomConsumer.java

package com.yxp.kafkareview;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration; import java.util.Arrays; import java.util.Properties;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:*/ public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();//kafka集群服務器 props.put("bootstrap.servers", "yxp:9092");//消費者的組idprops.put("group.id", "test");//允許自動提交 props.put("enable.auto.commit", "true");//自動提交的間隔時間 props.put("auto.commit.interval.ms", "1000");//key的反序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//value的反序列化器 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//指定提交到的topic consumer.subscribe(Arrays.asList("topic_mem"));//無限循環的poll數據while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} }

2.2 手動提交offset

? 雖然自動提交offset十分簡潔便利,但由于其是基于時間提交的,開發人員難以把握offset提交的時機,因此Kafka還提供了手動提交offset的API。

手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點是,都會將本次poll的一批數據最高的偏移量提交;不同點是,commitSync阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而commitAsync則沒有失敗重試機制,故有可能提交失敗。

2.2.1 同步提交offset

由于同步提交offset有失敗重試機制,故更加可靠,以下為同步提交offset的示例。

CustomConsumer.java

package com.yxp.kafkareview;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration; import java.util.Arrays; import java.util.Properties;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:*/ public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();//集群服務器props.put("bootstrap.servers", "yxp:9092");//組idprops.put("group.id", "test");//關閉自動提交props.put("enable.auto.commit", "false");//自動提交間隔props.put("auto.commit.interval.ms", "1000");//key和value的反序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic_mem"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}//消費完之后同步提交offset偏移量consumer.commitSync();}} }

2.2.2 異步提交offset

雖然同步提交offset更可靠一些,但是由于其會阻塞當前線程,直到提交成功。因此吞吐量會收到很大的影響。因此更多的情況下,會選用異步提交offset的方式。

CustomConsumer.java

package com.yxp.kafkareview;import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition;import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:*/ public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();//集群服務器props.put("bootstrap.servers", "yxp:9092");//組idprops.put("group.id", "test");//關閉自動提交props.put("enable.auto.commit", "false");//自動提交間隔props.put("auto.commit.interval.ms", "1000");//key和value的反序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic_mem"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}//消費完之后異步提交offset偏移量consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception!=null){exception.printStackTrace();}}});}} }

2.3 數據漏消費和重復消費分析

無論是同步提交還是異步提交offset,都有可能會造成數據的漏消費或者重復消費。先提交offset后消費,有可能造成數據的漏消費而先消費后提交offset,有可能會造成數據的重復消費。


3.自定義Interceptor

3.1 攔截器原理

? Producer攔截器(Interceptor)主要用于實現clients端的定制化控制邏輯。

? 對于Producer而言,Interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個Interceptor按序作用于同一條消息從而形成一個攔截鏈(Interceptor chain)。攔截器需要實現ProducerInterceptor接口,其定義的方法包括:

(1)configure(configs):

獲取配置信息和初始化數據時調用。

(2)onSend(ProducerRecord)

該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區前調用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區,否則會影響目標分區的計算。

(3)onAcknowledgement(RecordMetadata, Exception)

該方法會在消息從RecordAccumulator成功發送到Kafka Broker之后,或者在發送過程中失敗時調用。并且通常都是在producer回調邏輯觸發之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發送效率。

(4)close

關閉interceptor,主要用于執行一些資源清理工作.

如前所述,interceptor可能被運行在多個線程中,因此在具體實現時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非向上傳遞。這在使用過程中要特別留意。

3.2 攔截器案例


3.2.1 需求

實現一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發送后更新成功發送消息數或失敗發送消息數。

3.2.2 增加時間戳攔截器

TimeInterceptor.java

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:*/ public class TimeInterceptor implements ProducerInterceptor<String,String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {//新構建一個ProducerRecorder對象(在值屬性那里將時間戳加在原值前面)return new ProducerRecord<>(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+","+ record.value());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {} }

3.2.3 計數攔截器

CounterInterceptor.java

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:統計發送消息的成功和失敗數*/ public class CounterInterceptor implements ProducerInterceptor<String,String > {private int errorCounter=0;private int successCounter=0;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {//統計成功和失敗的次數if (exception==null){successCounter++;}else {errorCounter++;}}@Overridepublic void close() {System.out.println("Success sent:"+successCounter);System.out.println("Error sent:"+errorCounter);}@Overridepublic void configure(Map<String, ?> configs) {} }

3.2.4 Producer主程序

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.ArrayList; import java.util.Properties;/*** @Author : 尤小鵬* 切忌一味模仿!* 2022/1/6/006* description:*/ public class InterceptorProducer {public static void main(String[] args) {//1.設置配置信息Properties props = new Properties();props.put("bootstrap.servers", "yxp:9092");props.put("acks", "all");props.put("retries", 3);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//2.構建攔截鏈ArrayList<String> interceptors = new ArrayList<>();interceptors.add(TimeInterceptor.class.getName());interceptors.add(CounterInterceptor.class.getName());props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);KafkaProducer<String, String> producer = new KafkaProducer<>(props);//3.發送消息for (int i=0;i<10;i++){producer.send(new ProducerRecord<>("Topic_new","message"+i));}//4.關閉producerproducer.close();}}

3.2.5 測試

1.在kafka上啟動消費者

shell端命令:kafka-console-consumer.sh --bootstrap-server yxp:9092 --topic Topic_new

2.然后運行客戶端java程序。

3.觀察消費者接受到的信息

總結

以上是生活随笔為你收集整理的Kafka的入门级API应用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。