日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka动态配置topic

發(fā)布時間:2024/3/24 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka动态配置topic 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?之前使用@org.springframework.kafka.annotation.KafkaListener這個注解的時候,是在yml文件中配置,然后使用@KafkaListener(topics = {"${kafka.topic.a2b.name}"}),這樣去單獨監(jiān)聽某一個topic,生產(chǎn)者也固定在代碼里定義變量讀取配置文件。昨天改了個需求,希望以后通過配置文件去動態(tài)配置生產(chǎn)者和消費者的topic(不知道個數(shù)和topic名字),而不需要改代碼。

一、踩坑

?剛開始的時候,由于考慮不充分(沒有考慮到topic個數(shù)未知),想到@KafkaListener注解中的topics本身就是個字符串數(shù)組,于是想通過傳入變量的形式。產(chǎn)生了以下兩種方法:

1.傳入變量方法一

? 使用@Value注解提取配置文件中相關(guān)配置,@KafkaListener中傳入變量

public static String[] topicArr;@Value("${kafka.bootstrap.servers}")public void setTopicArr(String[] value){String topicArr = value;}@KafkaListener(topics= topicArr)

emmmm。。。結(jié)果可想而知,不行。

2.傳入變量方法二

?還是傳入變量,不過這次寫了個動態(tài)配置的代碼

注解里這么寫@KafkaListener(topics = "${topicName1}","${topicName2}","${topicName3}")提前將yml文件里添加topics: topicName1,topicName2,topicName3然后加載進來@Value("${kafka.topics}")public void setTopics(String value){topics = value;}動態(tài)配置代碼:@Configurationpublic class KafkaTopicConfiguration implements InitializingBean {@Autowiredprivate KafkaConfig kafkaconfig;@Overridepublic void afterPropertiesSet() throws Exception {String[] topicArr = kafkaconfig.split(",");int i = 1;for(String topic : topicArr){String topicName = "topicName"+i;System.setProperty(topicName, topic);}}}

相比方法一,可行。但是未知topic數(shù)量呢。GG。

3.不用注解

?百度找到幾個老哥的動態(tài)獲取并創(chuàng)建topic的方法

https://www.cnblogs.com/gaoyawei/p/7723974.html https://www.cnblogs.com/huxi2b/p/7040617.html https://blog.csdn.net/qq_27232757/article/details/78970830

寫了幾版,各種各樣的問題,還是我太菜。就想再看看有沒有別的簡單點的解決辦法,沒有了再回來搞這個。

4.正則匹配topic

?這期間又找到一個使用正則匹配topic的。直接貼鏈接。

@KafkaListener(topicPattern = "showcase.*") 這里使用正則匹配topic,其中【*】之前得加上【.】才能匹配到。

中間模仿寫了一版使用正則匹配的,其實也可以糊弄實現(xiàn)需求,除了topic取名的時候一定得規(guī)范以外,還得考慮到如果不想用某個topic了又得想怎么去避免他。
這種方法不太嚴謹,繼續(xù)踩坑吧。

二、問題解決

?用蹩腳的英語google了一下,嗯?好多老哥們也是用的以上差不多的方法。然而最后在某個老哥github的issues中看到了解決辦法。老哥的需求跟我差不多,感謝大佬,貼上最終問題解決方案。

1.kafka消費者監(jiān)聽配置

還是注解的形式 @KafkaListener(topics = "#{'${kafka.listener_topics}'.split(',')}")

讀取yml文件中kafka.listener_topics的參數(shù),然后根據(jù)“,”去split,得到一個topics數(shù)組。
這么做就可以根據(jù)配置文件動態(tài)的去監(jiān)聽topic。

2.yml配置文件

只列出topic相關(guān)部分(mqTypes是我用來判斷使用哪個topic發(fā)送的)kafka:listener_topics: kafka-topic-a2b,kafka-topic-c2bconsume:topic:- name: kafka-topic-a2bpartitions: 12replication_factor: 2- name: kafka-topic-c2bpartitions: 12replication_factor: 2product:topic:- name: kafka-topic-b2apartitions: 12replication_factor: 2mqTypes: type1- name: kafka-topic-b2cpartitions: 12replication_factor: 2mqTypes: type1

3.yml參數(shù)解析

這里我將kafka的topic相關(guān)加載到bean中處理。
創(chuàng)建KafkaConsumerBean和KafkaProducerBean分別用來存儲yml中生產(chǎn)者和消費者的topic相關(guān)參數(shù)

//KafkaConsumerBean @Component @ConfigurationProperties(prefix = "kafka.consume") public class KafkaConsumerBean {private List<Map<String,String>> topic;public void setTopic(List<Map<String, String>> topic) {this.topic = topic;}public List<Map<String, String>> getTopic() {return topic;} }//KafkaProducerBean @Component @ConfigurationProperties(prefix = "kafka.product") public class KafkaProducerBean {private List<Map<String,String>> topic;public void setTopic(List<Map<String, String>> topic) {this.topic = topic;}private Map<String,String> mqType2NameMap = new HashMap<String,String>();public List<Map<String, String>> getTopic() {return topic;}public String getTopic(String mqType){String name = mqType2NameMap.get(mqType);if(name != null){return name;}else{for(Map<String,String> topicProperty : topic){if (topicProperty.get("mqTypes").indexOf(mqType) >= 0){name = topicProperty.get("name");mqType2NameMap.put(mqType,name);return name;}}return null;}} }

4.創(chuàng)建topic

List<Map<String,String>> producerTopicList = kafkaProducerBean.getTopic();for (Map<String,String> topicProperty : producerTopicList){KafkaClient.createTopic(topicProperty.get("name"),Integer.parseInt(topicProperty.get("partitions")),Integer.parseInt(topicProperty.get("replication_factor")));}List<Map<String,String>> consumerTopicList = kafkaConsumerBean.getTopic();for (Map<String,String> topicProperty : consumerTopicList){KafkaClient.createTopic(topicProperty.get("name"),Integer.parseInt(topicProperty.get("partitions")),Integer.parseInt(topicProperty.get("replication_factor")));}

三、總結(jié)

?上面解決問題的方法關(guān)鍵在于

@KafkaListener(topics = "#{'${kafka.listener_topics}'.split(',')}")

@KafkaListener這個注解會去讀取spring的yml配置文件中

kafka:listener_topics: kafka-topic-a2b,kafka-topic-c2b

這塊listener_topics配置信息,然后通過’,'分割成topic數(shù)組,KafkaListener注解中的 topics 參數(shù),本身就是個數(shù)組,如下

// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) //package org.springframework.kafka.annotation;import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Repeatable; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.messaging.handler.annotation.MessageMapping;@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @MessageMapping @Documented @Repeatable(KafkaListeners.class) public @interface KafkaListener {String id() default "";String containerFactory() default "";String[] topics() default {};String topicPattern() default "";TopicPartition[] topicPartitions() default {};String group() default ""; }

?結(jié)合我之前的kafka文章,應(yīng)該是可以拼出一套成型的。

總結(jié)

以上是生活随笔為你收集整理的kafka动态配置topic的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。