日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器

發布時間:2025/3/21 java 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Java kafka如何實現自定義分區類和攔截器

2、producer配置文件指定,具體的分區類

// 具體的分區類

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。

許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測試下:

1、定義消息攔截器,實現消息處理(可以是加時間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

import java.util.UUID;

public class MessageInterceptor implements ProducerInterceptor {

@Override

public void configure(Map configs) {

System.out.println("這是MessageInterceptor的configure方法");

}

/**

* 這個是消息發送之前進行處理

*

* @param record

* @return

*/

@Override

public ProducerRecord onSend(ProducerRecord record) {

// 創建一個新的record,把uuid入消息體的最前部

System.out.println("為消息添加uuid");

return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),

UUID.randomUUID().toString().replace("-", "") + "," + record.value());

}

/**

* 這個是生產者回調函數調用之前處理

* @param metadata

* @param exception

*/

@Override

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

System.out.println("MessageInterceptor攔截器的onAcknowledgement方法");

}

@Override

public void close() {

System.out.println("MessageInterceptor close 方法");

}

}

2、定義計數攔截器

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor{

private int errorCounter = 0;

private int successCounter = 0;

@Override

public void configure(Map configs) {

System.out.println("這是CounterInterceptor的configure方法");

}

@Override

public ProducerRecord onSend(ProducerRecord record) {

System.out.println("CounterInterceptor計數過濾器不對消息做任何操作");

return record;

}

@Override

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

// 統計成功和失敗的次數

System.out.println("CounterInterceptor過濾器執行統計失敗和成功數量");

if (exception == null) {

successCounter++;

} else {

errorCounter++;

}

}

@Override

public void close() {

// 保存結果

System.out.println("Successful sent: " + successCounter);

System.out.println("Failed sent: " + errorCounter);

}

}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

public class Producer1 {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

// Kafka服務端的主機名和端口號

props.put("bootstrap.servers", "localhost:9092");

// 等待所有副本節點的應答

props.put("acks", "all");

// 消息發送最大嘗試次數

props.put("retries", 0);

// 一批消息處理大小

props.put("batch.size", 16384);

// 請求延時,可能生產數據太快了

props.put("linger.ms", 1);

// 發送緩存區內存大小,數據是先放到生產者的緩沖區

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 具體的分區類

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

//定義攔截器

List interceptors = new ArrayList<>();

interceptors.add("kafka.MessageInterceptor");

interceptors.add("kafka.CounterInterceptor");

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

Producer producer = new KafkaProducer<>(props);

for (int i = 0; i < 1; i++) {

producer.send(new ProducerRecord("test_0515", i + "", "xxx-" + i), new Callback() {

public void onCompletion(RecordMetadata recordMetadata, Exception e) {

System.out.println("這是producer回調函數");

}

});

}

/*System.out.println("現在執行關閉producer");

producer.close();*/

producer.close();

}

}

總結,我們可以知道攔截器鏈各個方法的執行順序,假如有A、B攔截器,在一個攔截器鏈中:

(1)執行A的configure方法,執行B的configure方法

(2)執行A的onSend方法,B的onSend方法

(3)生產者發送完畢后,執行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執行producer自身的callback回調函數。

(5)執行A的close方法,B的close方法。

Java kafka如何實現自定義分區類和攔截器相關教程

總結

以上是生活随笔為你收集整理的java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 人人模人人干 | 91超碰在线免费观看 | 国产精品毛片va一区二区三区 | 日本免费一区二区三区四区五六区 | www.黄色在线观看 | 91视频免费在线 | 精品无码av在线 | 日韩黄色网址 | 神马伦理视频 | av免费观看网站 | 国产精品1024 | 午夜影院美女 | 黄色国产视频网站 | 国产欧美日韩综合精品 | 蜜桃网站 | 好看的黄色录像 | 成人夜色视频 | 尤物在线观看视频 | 嫩草影院懂你的 | 992tv在线成人免费观看 | 午夜免费福利影院 | 国产精品麻豆一区二区三区 | 天天综合在线视频 | 久久精品无码一区二区三区 | 成人免费高清在线播放 | 日韩精品一区二区不卡 | 国产精品卡一 | 国产精品69毛片高清亚洲 | av天天堂 | 亚洲激情二区 | 乳揉みま痴汉4在线播放 | 伊人色网 | 老司机av导航 | 欧美性猛交 xxxx | 久久精品美女视频 | 污片免费在线观看 | 欧美a√在线| 日本特级黄色大片 | 中文在线字幕观看 | 天堂av资源在线 | 美丽的小蜜桃2:美丽人生 | 久久精品av | 黄色无遮挡网站 | 波多野结衣视频播放 | 日本一二三不卡视频 | 打美女白嫩屁屁网站 | 青春草在线视频免费观看 | 在线观看欧美亚洲 | 亚洲 欧美 变态 另类 综合 | 91免费在线视频观看 | 日韩三级在线播放 | 美女调教视频 | 国产欧美二区 | 亚洲自拍偷拍区 | 国内精品久久久久久久久 | 国产人妻黑人一区二区三区 | 久久久久久国产精品三区 | 国产精品网站免费 | 2020亚洲男人天堂 | 日韩精品一区二区三区视频在线观看 | 色视频网址 | 欧美一区二区激情 | 一区二区三区蜜桃 | 美女av影院| 欧洲久久久久久 | 成人夜色视频 | 日皮视频免费观看 | 国产探花精品在线 | 超碰97在线播放 | 成人三级电影网站 | av在线日韩 | 大咪咪av| 91免费看大片 | 林雅儿欧洲留学恋爱日记在线 | 天堂va蜜桃一区 | 亚洲乱码日产精品bd在线观看 | 三级国产在线 | 91华人在线| 国产成人免费在线 | 久久艹综合 | 巨物撞击尤物少妇呻吟 | 精品成人av | 欧美日韩国产一区二区 | 嫩草视频入口 | 日韩黄色三级视频 | 亚洲是色 | 98视频在线 | 黄网站免费大全入口 | www.日本精品 | 国产亚洲精品成人av在线 | 一本一道久久a久久 | 成人免费一级视频 | 理伦毛片 | 超碰自拍| 久久新 | 国产精品久久久久久久久免费 | 国产成人精品一区二区在线观看 | 麻豆va| 国产九色在线播放九色 |