日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka再均衡监听器测试

發(fā)布時(shí)間:2023/12/3 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka再均衡监听器测试 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

【README】

  • 本文使用的kafka是最新的 kafka3.0.0;本文kafka集群有3個(gè)節(jié)點(diǎn)分別是 centos201, centos202, centos203 ; brokerid 分別為 1,2,3;
  • 本文主要用于測試 再均衡監(jiān)聽器;當(dāng)有新消費(fèi)者加入時(shí),會(huì)發(fā)生分區(qū)再均衡;再均衡前就會(huì)調(diào)用再均衡監(jiān)聽器的 onPartitionsRevoked()方法;
  • 本文的測試主題 hello12,有3個(gè)分區(qū),每個(gè)分區(qū)2個(gè)副本;


【1】再均衡監(jiān)聽器

1)應(yīng)用場景?

在消費(fèi)者退出或進(jìn)行分區(qū)再均衡前,會(huì)做一些清理工作,如提交偏移量,或關(guān)閉數(shù)據(jù)庫連接;這些工作可以通過監(jiān)聽器來實(shí)現(xiàn);

2)再均衡監(jiān)聽器 ConsumerRebalanceListener

實(shí)現(xiàn) 該監(jiān)聽器即可,有3個(gè)方法

  • onPartitionsRevoked:在分區(qū)均衡開始【前】和消費(fèi)者停止讀取消息【后】被調(diào)用;
  • onPartitionsAssigned:分區(qū)再均衡【后】和消費(fèi)者開始讀取消息【前】被調(diào)用 ;
  • onPartitionsLost:分區(qū)宕機(jī)時(shí)調(diào)用(本文不涉及);
  • /*** @Description 消費(fèi)者分區(qū)再均衡監(jiān)聽器實(shí)現(xiàn)類* @author xiao tang* @version 1.0.0* @createTime 2021年12月11日*/ public class ConsumerBalanceListenerImpl implements ConsumerRebalanceListener {/** 消費(fèi)者 */private Consumer consumer;/** 主題分區(qū)偏移量數(shù)組 */private MyTopicPartitionOffset[] topicPartitionOffsetArr;/*** @description 構(gòu)造器* @param consumer 消費(fèi)者* @param curOffsets 當(dāng)前偏移量* @author xiao tang* @date 2021/12/11*/public ConsumerBalanceListenerImpl(Consumer consumer, MyTopicPartitionOffset[] topicPartitionOffsetArr) {this.consumer = consumer;this.topicPartitionOffsetArr = topicPartitionOffsetArr;}/** * @description 在分區(qū)均衡開始【前】和消費(fèi)者停止讀取消息【后】被調(diào)用* @param partitions 分區(qū)列表(分區(qū)號從0開始計(jì)數(shù))* @author xiao tang* @date 2021/12/12 */@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("=== 分區(qū)再均衡觸發(fā)onPartitionsRevoked()方法");// 提交偏移量回調(diào)(或記錄錯(cuò)誤日志)OffsetCommitCallback offsetCommitCallback = new OffsetCommitCallbackImpl();// 打印日志Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x->System.out.printf("提交偏移量信息,partition【%d】offset【%s】\n", x.partition(), x.offset()));// 把數(shù)組轉(zhuǎn)為主題分區(qū)與偏移量映射,并提交最后一次處理的偏移量 (可以異步,也可以同步)// 同步提交一直重試或報(bào)超時(shí)異常// 異步提交傳入提交回調(diào),失敗自行處理consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), offsetCommitCallback);// 停止程序的原因在于做實(shí)驗(yàn),下次從本次提交的偏移量開始消費(fèi)throw new RuntimeException("發(fā)生分區(qū)再均衡,程序結(jié)束");}/*** @description 分區(qū)再均衡【后】和消費(fèi)者開始讀取消息【前】被調(diào)用* @param partitions 主題分區(qū)列表* @author xiao tang* @date 2021/12/12*/@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// do sth}@Overridepublic void onPartitionsLost(Collection<TopicPartition> partitions) {ConsumerRebalanceListener.super.onPartitionsLost(partitions);} }

    為了測試,分區(qū)再均衡監(jiān)聽器中,onPartitionsRevoked() 方法提交最后已消費(fèi)消息的偏移量后,就拋出運(yùn)行時(shí)異常結(jié)束運(yùn)行,讓其他消費(fèi)者消費(fèi)以便查看監(jiān)聽器是否成功提交偏移量;

    3)消費(fèi)者工具 ?

    /*** @Description 消費(fèi)者工具* @author xiao tang* @version 1.0.0* @createTime 2021年12月12日*/ public enum MyConsumerUtils {/** 單例 */INSTANCE;/*** @description 獲取主題分區(qū)與偏移量映射* @param topicPartitionOffsetArr 主題分區(qū)與偏移量數(shù)組* @return 主題分區(qū)與偏移量映射* @author xiao tang* @date 2021/12/12*/public static Map<TopicPartition, OffsetAndMetadata> getTopicPartitionOffsetMetadataMap(MyTopicPartitionOffset[] topicPartitionOffsetArr) {// 主題分區(qū)與偏移量映射Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMetadataMap = new HashMap<>(topicPartitionOffsetArr.length);// 分區(qū)號大于-1,才是消費(fèi)者接收的分區(qū)Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x -> {topicPartitionOffsetMetadataMap.put(new TopicPartition(x.topic(), x.partition()), new OffsetAndMetadata(x.offset(), "no metadata"));});return topicPartitionOffsetMetadataMap;} }


    【2】生產(chǎn)者?

    /*** @Description 生產(chǎn)者* @author xiao tang* @version 1.0.0* @createTime 2021年12月03日*/ public class MyProducer {/** 主題 */public final static String TOPIC_NAME = "hello12";public static void main(String[] args) {/* 1.創(chuàng)建kafka生產(chǎn)者的配置信息 */Properties props = new Properties();/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");/*3.ack應(yīng)答級別*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重試次數(shù)*/props.put(ProducerConfig.RETRIES_CONFIG, 0);/*5.批次大小,一次發(fā)送多少數(shù)據(jù),當(dāng)數(shù)據(jù)大于16k,生產(chǎn)者會(huì)發(fā)送數(shù)據(jù)到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);/*6.等待時(shí)間, 等待時(shí)間超過1毫秒,即便數(shù)據(jù)沒有大于16k, 也會(huì)寫數(shù)據(jù)到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 超時(shí)時(shí)間props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);/*7. RecordAccumulator 緩沖區(qū)大小*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);/*8. key, value 的序列化類 */props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/** 設(shè)置壓縮算法 */props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");/** 設(shè)置攔截器 */ // props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName()));/** 設(shè)置阻塞超時(shí)時(shí)間 */props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000);/* 9.創(chuàng)建生產(chǎn)者對象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);/* 10.發(fā)送數(shù)據(jù) */int order = 1;for (int i = 0; i < 10000; i++) {for (int j = 0; j < 3; j++) {Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, j, "", String.format("[%s] ", order++) + " > " + DataFactory.INSTANCE.genChar(5)));try {System.out.println("[生產(chǎn)者] 分區(qū)【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");} catch (Exception e) {}}}/* 11.關(guān)閉資源 */producer.close();System.out.println("kafka生產(chǎn)者寫入數(shù)據(jù)完成");} }

    ?【3】消費(fèi)者

    【3.1】帶有均衡監(jiān)聽器的消費(fèi)者1

    /*** @Description 帶有均衡監(jiān)聽器的消費(fèi)者* @author xiao tang* @version 1.0.0* @createTime 2021年12月11日*/ public class MyConsumerWithRebalanceListener {public static void main(String[] args) {// 創(chuàng)建消費(fèi)者配置信息Properties props = new Properties();// 屬性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1");// 關(guān)閉自動(dòng)提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 設(shè)置消費(fèi)消息的位置,消費(fèi)最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 設(shè)置分區(qū)策略 (默認(rèn)值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 創(chuàng)建消費(fèi)者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);int partitionSize = consumer.partitionsFor(MyProducer.TOPIC_NAME).size();// 創(chuàng)建分區(qū)偏移量數(shù)組并初始化 (僅考慮一個(gè)topic的情況)MyTopicPartitionOffset[] topicPartitionOffsetArr = new MyTopicPartitionOffset[partitionSize];IntStream.range(0, partitionSize).forEach(x -> topicPartitionOffsetArr[x] = new MyTopicPartitionOffset());// 訂閱主題, 【傳入分區(qū)再均衡監(jiān)聽器】consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME), new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));// 循環(huán)拉取try {while (!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() + " > 帶均衡監(jiān)聽器消費(fèi)者等待消費(fèi)消息");TimeUtils.sleep(1000);// 消費(fèi)消息ConsumerRecords<String, String> consumerRds = consumer.poll(100);System.out.println("poll begin {");for (ConsumerRecord<String, String> rd : consumerRds) {System.out.println("消費(fèi)者-WithRebalanceListener-分區(qū)【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());// 提交的偏移量,是 當(dāng)前消息偏移量加1topicPartitionOffsetArr[rd.partition()].setAll(rd.topic(), rd.partition(), rd.offset() + 1);}System.out.println("poll end } ");// 【異步提交每個(gè)分區(qū)的偏移量】consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), new OffsetCommitCallbackImpl());}} finally {try {// 【同步提交】 因?yàn)殄e(cuò)誤時(shí),同步提交會(huì)一直重試,直到提交成功或發(fā)生無法恢復(fù)的錯(cuò)誤consumer.commitSync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr));} finally {// 記得關(guān)閉消費(fèi)者consumer.close();}}} }

    【3.2】 不帶均衡監(jiān)聽器的消費(fèi)者2 (測試用)

    即一個(gè)普通消費(fèi)者;

    /*** @Description 不帶均衡監(jiān)聽器的消費(fèi)者* @author xiao tang* @version 1.0.0* @createTime 2021年12月11日*/ public class MyConsumerNoRebalanceListener {public static void main(String[] args) {// 創(chuàng)建消費(fèi)者配置信息Properties props = new Properties();// 屬性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1");// 關(guān)閉自動(dòng)提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 設(shè)置消費(fèi)消息的位置,消費(fèi)最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 設(shè)置分區(qū)策略 (默認(rèn)值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 創(chuàng)建消費(fèi)者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 訂閱主題, 【沒有分區(qū)再均衡監(jiān)聽器】consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));// 循環(huán)拉取try {while(!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() + " > 沒有均衡監(jiān)聽器的消費(fèi)者等待消費(fèi)消息");TimeUtils.sleep(1000);// 消費(fèi)消息ConsumerRecords<String, String> consumerRds = consumer.poll(100);for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("消費(fèi)者-NoRebalanceListener-分區(qū)【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());}// 【異步提交】consumer.commitAsync(new OffsetCommitCallbackImpl());if (!consumerRds.isEmpty()) break;}} finally {try {// 【同步提交】 因?yàn)殄e(cuò)誤時(shí),同步提交會(huì)一直重試,直到提交成功或發(fā)生無法恢復(fù)的錯(cuò)誤consumer.commitSync();} finally {// 記得關(guān)閉消費(fèi)者consumer.close();System.out.println("消費(fèi)者關(guān)閉");}}} }

    ?我們可以發(fā)現(xiàn),一旦消費(fèi)者2消費(fèi)了消息(消息不為空),就停止消費(fèi);

    以便我們查看消費(fèi)者2接收消息的偏移量是不是 消費(fèi)者1的監(jiān)聽器在發(fā)生分區(qū)再均衡前提交的偏移量+1;


    【4】測試

    【4.1】測試步驟

    step1) 運(yùn)行生產(chǎn)者,發(fā)送消息到kafka;

    step2) 運(yùn)行 帶有均衡監(jiān)聽器的消費(fèi)者1 MyConsumerWithRebalanceListener, 消費(fèi)消息;

    在消費(fèi)者訂閱主題時(shí),傳入再均衡監(jiān)聽器;

    // 訂閱主題, 【傳入分區(qū)再均衡監(jiān)聽器】 consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME) , new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));

    step3)運(yùn)行 不帶均衡監(jiān)聽器的消費(fèi)者2 MyConsumerNoRebalanceListener,消費(fèi)消息;

    一旦消費(fèi)者2加入消費(fèi)者組,就會(huì)發(fā)生分區(qū)再均衡,消費(fèi)者1的某些分區(qū)所有權(quán)會(huì)轉(zhuǎn)給消費(fèi)者2,觸發(fā)消費(fèi)者1 的 監(jiān)聽器 ConsumerBalanceListenerImpl 的 onPartitionsRevoked() 方法;

    • 然后 onPartitionsRevoked()方法提交 消費(fèi)者1處理的消息的偏移量后,就原地拋出異常停止運(yùn)行;

    【4.2】測試結(jié)果分析

    1)消費(fèi)者1(帶分區(qū)再均衡監(jiān)聽器)的監(jiān)聽器最后提交的偏移量日志如下:

    2021-12-12 10:23:30 > 帶均衡監(jiān)聽器消費(fèi)者等待消費(fèi)消息
    === 分區(qū)再均衡觸發(fā)onPartitionsRevoked()方法
    提交偏移量信息,partition【0】offset【1296】
    提交偏移量信息,partition【1】offset【1269】
    提交偏移量信息,partition【2】offset【1269】

    2)消費(fèi)者2接收到的起始消息的偏移量日志如下(全部):

    2021-12-12 10:23:27 > 沒有均衡監(jiān)聽器的消費(fèi)者等待消費(fèi)消息
    2021-12-12 10:23:32 > 沒有均衡監(jiān)聽器的消費(fèi)者等待消費(fèi)消息
    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1296】值=[589]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1297】值=[592]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1298】值=[595]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1299】值=[598]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1300】值=[601]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1301】值=[604]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1302】值=[607]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1303】值=[610]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1304】值=[613]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1269】值=[510]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1270】值=[513]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1271】值=[516]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1272】值=[519]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1273】值=[522]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1274】值=[525]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1275】值=[528]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1276】值=[531]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1277】值=[534]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1269】值=[509]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1270】值=[512]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1271】值=[515]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1272】值=[518]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1273】值=[521]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1274】值=[524]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1275】值=[527]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1276】值=[530]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1277】值=[533]? > ABCDE
    消費(fèi)者關(guān)閉

    即 監(jiān)聽器提交的偏移量為:

    partition【0】offset【1296】
    partition【1】offset【1269】
    partition【2】offset【1269】

    而普通消費(fèi)者接收消息的起始偏移量為

    消費(fèi)者-NoRebalanceListener-分區(qū)【0】offset【1296】值=[589]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【2】offset【1269】值=[510]? > ABCDE
    消費(fèi)者-NoRebalanceListener-分區(qū)【1】offset【1269】值=[509]? > ABCDE

    所以,偏移量是可以對上的;即再均衡監(jiān)聽器在發(fā)生分區(qū)再均衡前提交的消息偏移量后, 其他消費(fèi)者可以接收該偏移量指定的消息;

    所以,本次再均衡監(jiān)聽器測試是成功的


    【注意】

    注意1)監(jiān)聽器提交的偏移量是接收消息的當(dāng)前偏移量+1;(注意要加1,非常重要),即加1后的偏移量作為其他消費(fèi)者輪序消費(fèi)的起始位置;

    • 代碼:偏移量+1參見? MyConsumerWithRebalanceListener 的 接收消息的循環(huán)中的代碼,如下:
    // 提交的偏移量,是 當(dāng)前消息偏移量加1 topicPartitionOffsetArr[rd.partition()].setAll( rd.topic(), rd.partition(), rd.offset() + 1);

    注意2)本文代碼參考了 《kafka權(quán)威指南》 page63,但書中代碼有問題

    • 在每次for循環(huán)中創(chuàng)建 TopicPartition對象和 OffsetAndMetadata對象,我覺得這是沒有必要的,因?yàn)橹挥忻總€(gè)分區(qū)的最后一次消息的 topic,partition,offset,是有用的,但它每次循環(huán)都創(chuàng)建了對象,而且把 currentOffsets 放在了while循環(huán)外面,這樣肯定會(huì)造成 oom;(本文僅記錄了 topic,partition,offset,而沒有創(chuàng)建對象,這是本文的優(yōu)化點(diǎn)
    • 當(dāng)然了,原作者寫的是參考代碼,可以理解;但業(yè)務(wù)生產(chǎn)代碼絕對不能這樣寫;

    【References】

    • kakfa權(quán)威指南;

    總結(jié)

    以上是生活随笔為你收集整理的kafka再均衡监听器测试的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。