當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
SpringBoot2.1.9 多Kafka消费者配置
生活随笔
收集整理的這篇文章主要介紹了
SpringBoot2.1.9 多Kafka消费者配置
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、配置文件
pom.xml
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>application.yml
spring:application:name: double-kafka-consumerprofiles:active: devjackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8server:port: 8008sys:kafka:one:bootstrap-servers: 192.168.1.2:5021consumer:group-id: one-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: one-kafka-testtwo:bootstrap-servers: 192.168.1.3:5021consumer:group-id: two-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: two-kafka-test?
二、配置Configuration Bean
(1)第一個kafka配置
public class OneKafkaConfig {@Beanpublic KafkaListenerContainerFactory oneKafkaFactory(@Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(oneConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//手動提交return factory;}@Beanpublic ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@ConfigurationProperties(prefix = "sys.kafka.one")@Beanpublic KafkaProperties oneKafkaProperties(){return new KafkaProperties();}}(2)第二個kafka配置(主)
public class TwoKafkaConfig {@Beanpublic KafkaListenerContainerFactory twoKafkaFactory(@Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(twoConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Primary//必須指定一個默認的消費者工廠@Beanpublic ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}@Primary//必須指定一個默認的kafka配置@ConfigurationProperties(prefix = "sys.kafka.two")@Beanpublic KafkaProperties twoKafkaProperties(){return new KafkaProperties();}}(3)kakfka配置導入
@Configuration @Import({OneKafkaConfig.class, TwoKafkaConfig.class}) public class KafkaConfig {@Beanpublic KafkaConsumer kafkaConsumer(){KafkaConsumer consumer = new KafkaConsumer();return consumer;}}(4) 消費者監聽
public class KafkaConsumer {@KafkaListener(containerFactory = "oneKafkaFactory", topics = "${sys.kafka.one.topic}")public void oneKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing }@KafkaListener(containerFactory = "twoKafkaFactory", topics = "${sys.kafka.two.topic}")public void twoKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing}}?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的SpringBoot2.1.9 多Kafka消费者配置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 你的办公室WiFi太慢办公室网速太慢
- 下一篇: gradle idea java ssm