日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka消费者接收分区测试

發布時間:2023/12/3 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka消费者接收分区测试 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

【README】

  • 本文演示了當有新消費者加入組后,其他消費者接收分區情況;
  • 本文還模擬了 broker 宕機的情況;
  • 本文使用的是最新的 kafka3.0.0 ;
  • 本文測試案例,來源于 消費者接收分區的5種模型,建議先看模型,refer2? ?https://blog.csdn.net/PacosonSWJTU/article/details/121853461https://blog.csdn.net/PacosonSWJTU/article/details/121853461

【1】kafka測試環境準備

1)kafka集群?

  • 3個broker,分別為 centos201, centos202, centos203 ,id分別為 1,2,3 ;
  • topic, 3個分區,2個副本;

?2)生產者代碼;

public class MyProducer {public static void main(String[] args) {/* 1.創建kafka生產者的配置信息 */Properties props = new Properties();/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");/*3.ack應答級別*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重試次數*/props.put(ProducerConfig.RETRIES_CONFIG, 0);/*5.批次大小,一次發送多少數據,當數據大于16k,生產者會發送數據到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);/*6.等待時間, 等待時間超過1毫秒,即便數據沒有大于16k, 也會寫數據到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 超時時間props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);/*7. RecordAccumulator 緩沖區大小*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);/*8. key, value 的序列化類 */props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/** 設置壓縮算法 */props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");/** 設置攔截器 */ // props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName()));/** 設置阻塞超時時間 */props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000);/* 9.創建生產者對象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);/* 10.發送數據 */int order = 1;for (int i = 0; i < 100000; i++) {for (int j = 0; j < 3; j++) {Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("hello10",j, "", String.format("[%s] ", order++) + " > " + DataFactory.INSTANCE.genChar(5)));try {System.out.println("[生產者] 分區【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");} catch (Exception e) {}}try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}/* 11.關閉資源 */producer.close();System.out.println("kafka生產者寫入數據完成");} }

生產者,會向每個分區發送1條消息,發送完成后,睡眠500ms; 共計循環 10w次; 共計5w秒;計劃耗時 10+小時;(這里其他同學可以自行設置為其他值)

3)4個消費者;編號為1,2,3,4

public class MyConsumer1 {public static void main(String[] args) {/* 1.創建消費者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092,centos202:9092,centos203:9092");/*2.2開啟自動提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自動提交的間隔時間*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello10G1"); // group.id/*2.6 重置消費者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 默認值是 lastest/*2.7 關閉自動提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/* 創建消費者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("hello10"));/* 指定消費者的每個分區從偏移量1開始讀取,下面的poll方法就會從位置1開始消費消息 */ // for (TopicPartition partition : consumer.assignment()) { // consumer.seek(partition, 1); // }// 消費消息try {// 死循環while(!Thread.interrupted()) {try {System.out.println(DateUtils.getNowTimestamp() + " 消費者1-等待消費消息");TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}// 消費消息-獲取數據ConsumerRecords<String, String> consumerRds = consumer.poll(100);// 遍歷 ConsumerRecordsfor(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("消費者1-分區【" + rd.partition() + "】offset【" + rd.offset() + "】 -> " + DateUtils.getNowTimestamp() + rd.key() + "--" + rd.value());}consumer.commitSync(); // 同步提交}} finally {// 記得關閉消費者consumer.close();}} }

這樣的消費者有4個,分別編號為 消費者 1,2,3,4 ;我的意思是4個不同的消費者類,以便打印日志標識;

我的消費者消費的是 lastest 最新產生的消費,這里可以自行設置為其他值,如 earlies;

4)添加日志配置,不打印 debug日志(因為kafka消費者debug日志很多)

新建 logback.xml ,設置僅打印info以上級別日志;

<?xml version="1.0" encoding="UTF-8"?> <configuration><logger name="org.apache.kafka.clients" level="info" /> </configuration>

5)為了直觀展示消費詳情,我會用命令行啟動4個不同消費者,而用idea啟動生產者;但編譯都通過maven;


【2】kafka測試

【2.1】測試1:當有新消費者加入后,整個消費者組成員接收分區情況;?

寫在前面: 文末會po出命令行啟動消費者的命令及參數;

消費者接收分區消息模型,參見

step0)啟動 生產者,發送消息到kafka;

step2)命令行啟動消費者1,消息消費日志如下:

消費者1接收了3個分區消息;?

?step2)命令行啟動消費者2,群組消費日志如下:

消費者1接收了個分區2消息;

消費者2接收了分區0和分區2的消息;

??step3)命令行繼續啟動消費者3,群組消費日志如下:

消費者1接收了個分區2消息;
消費者2接收了分區0的消息;
消費者3接收了分區1的消息;

?step4)命令行繼續啟動消費者4, 日志如下:

消費者1接收了個分區2消息;
消費者2接收了分區0的消息;
消費者3接收了分區1的消息;

消費者4空閑;

?


【2】 模擬kafka broker 宕機

寫在前面,模擬宕機前先查看 topic 詳情

(圖1)

step1) 停止掉 201 broker的服務

情況1:topic的分區沒有受影響,但leader 副本選舉為3,比較本圖和圖1,看差別;?

?情況2:所有消費者全部阻塞,直到超時全部拋出異常;

等待 kafka集群的控制器,首領副本選擇完成后,又可以接收消費者請求;?

  • 補充1: 這里有一小段時間延時,即當有broker宕機后,需要重新選舉控制器,首領副本等;而且會發生分區再均衡

?step2)重啟 201;消費日志:如下:

消費者1接收了個分區1消息;
消費者2接收了分區2的消息;
消費者3空閑;
消費者4接收了分區0的消息;

?之所以 消費者3空閑,消費者4忙碌,是因為 broker 動態上下線,導致了分區再均衡使得分區所有權從消費者A轉到消費者B(201宕機前,是消費者3忙碌,消費者4空閑);

【小結】

1,要保證kafka消息可靠性,需要 生產者,broker,消費者3方的全力配合;

2,本文這里僅記錄了一部分 kafka集群異常的情況;


【附錄】

命令行啟動消費者命令及參數;僅供參考;因為路徑肯定不一樣;

其實,這是拷貝idea的執行日志里的命令,如下:

?

java -classpath D:\Java\jdk1.8.0_172\jre\lib\charsets.jar;D:\Java\jdk1.8.0_172\jre\lib\deploy.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_172\jre\lib\javaws.jar;D:\Java\jdk1.8.0_172\jre\lib\jce.jar;D:\Java\jdk1.8.0_172\jre\lib\jfr.jar;D:\Java\jdk1.8.0_172\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_172\jre\lib\jsse.jar;D:\Java\jdk1.8.0_172\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_172\jre\lib\plugin.jar;D:\Java\jdk1.8.0_172\jre\lib\resources.jar;D:\Java\jdk1.8.0_172\jre\lib\rt.jar;D:\workbench_idea\study4vw\vwstudy22\target\classes;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.5.4\spring-boot-starter-web-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter\2.5.4\spring-boot-starter-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot\2.5.4\spring-boot-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.5.4\spring-boot-autoconfigure-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.5.4\spring-boot-starter-logging-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\ch\qos\logback\logback-classic\1.2.5\logback-classic-1.2.5.jar;D:\software_cluster\mvn_repo\.m2\repository\ch\qos\logback\logback-core\1.2.5\logback-core-1.2.5.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.14.1\log4j-to-slf4j-2.14.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\logging\log4j\log4j-api\2.14.1\log4j-api-2.14.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\slf4j\jul-to-slf4j\1.7.32\jul-to-slf4j-1.7.32.jar;D:\software_cluster\mvn_repo\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-core\5.3.9\spring-core-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-jcl\5.3.9\spring-jcl-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\yaml\snakeyaml\1.28\snakeyaml-1.28.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.5.4\spring-boot-starter-json-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.12.4\jackson-databind-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.12.4\jackson-annotations-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.12.4\jackson-core-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.12.4\jackson-datatype-jdk8-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.12.4\jackson-datatype-jsr310-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.12.4\jackson-module-parameter-names-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.5.4\spring-boot-starter-tomcat-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.52\tomcat-embed-core-9.0.52.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.52\tomcat-embed-el-9.0.52.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.52\tomcat-embed-websocket-9.0.52.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-web\5.3.9\spring-web-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-beans\5.3.9\spring-beans-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-webmvc\5.3.9\spring-webmvc-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-aop\5.3.9\spring-aop-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-context\5.3.9\spring-context-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-expression\5.3.9\spring-expression-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\kafka\kafka-clients\3.0.0\kafka-clients-3.0.0.jar;D:\software_cluster\mvn_repo\.m2\repository\com\github\luben\zstd-jni\1.5.0-2\zstd-jni-1.5.0-2.jar;D:\software_cluster\mvn_repo\.m2\repository\org\lz4\lz4-java\1.7.1\lz4-java-1.7.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\xerial\snappy\snappy-java\1.1.8.1\snappy-java-1.1.8.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;D:\software_cluster\mvn_repo\.m2\repository\org\slf4j\slf4j-simple\1.7.25\slf4j-simple-1.7.25.jar kafka.consumer.MyConsumer2

總結

以上是生活随笔為你收集整理的kafka消费者接收分区测试的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。