日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

SpringBoot2.1.9 多Kafka消费者配置

發布時間:2023/12/3 javascript 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpringBoot2.1.9 多Kafka消费者配置 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、配置文件

pom.xml

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>

application.yml

spring:application:name: double-kafka-consumerprofiles:active: devjackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8server:port: 8008sys:kafka:one:bootstrap-servers: 192.168.1.2:5021consumer:group-id: one-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: one-kafka-testtwo:bootstrap-servers: 192.168.1.3:5021consumer:group-id: two-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: two-kafka-test

?

二、配置Configuration Bean

(1)第一個kafka配置

public class OneKafkaConfig {@Beanpublic KafkaListenerContainerFactory oneKafkaFactory(@Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(oneConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//手動提交return factory;}@Beanpublic ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@ConfigurationProperties(prefix = "sys.kafka.one")@Beanpublic KafkaProperties oneKafkaProperties(){return new KafkaProperties();}}

(2)第二個kafka配置(主)

public class TwoKafkaConfig {@Beanpublic KafkaListenerContainerFactory twoKafkaFactory(@Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(twoConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Primary//必須指定一個默認的消費者工廠@Beanpublic ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}@Primary//必須指定一個默認的kafka配置@ConfigurationProperties(prefix = "sys.kafka.two")@Beanpublic KafkaProperties twoKafkaProperties(){return new KafkaProperties();}}

(3)kakfka配置導入

@Configuration @Import({OneKafkaConfig.class, TwoKafkaConfig.class}) public class KafkaConfig {@Beanpublic KafkaConsumer kafkaConsumer(){KafkaConsumer consumer = new KafkaConsumer();return consumer;}}

(4) 消費者監聽

public class KafkaConsumer {@KafkaListener(containerFactory = "oneKafkaFactory", topics = "${sys.kafka.one.topic}")public void oneKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing }@KafkaListener(containerFactory = "twoKafkaFactory", topics = "${sys.kafka.two.topic}")public void twoKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing}}

?

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的SpringBoot2.1.9 多Kafka消费者配置的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。