日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

springboot + kafka

發布時間:2023/12/15 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springboot + kafka 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這片文章中我們來集成下常用的消息隊列(MQ)kafka,至于消息隊列的作用,在此不再贅述,參考前面的文章。

在該篇文章中我沒有采用配置文件的形式(application-dev.properties)配置,而是手動編寫的kafkaProduce和kafkaConsumer的配置,這樣更靈活。

注:基于demo-springboot

1.打開pom.xml加入以下內容:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <!--低版本貌似存在問題:1.1.7--> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.4.RELEASE</version> </dependency>

以上為引入spring-kafka的最新依賴包

?

2.編寫生產者的配置:kafkaProduceConfig

package com.example.demo.config;import com.google.common.collect.Maps; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory;import java.util.Map;/*** @author xiaofeng* @version V1.0* @title: KafkaProducerConfig.java* @package: com.example.demo.config* @description: kafka生產者配置* @date 2018/4/2 0002 下午 3:49*/ @Configuration @EnableKafka public class KafkaProducerConfig {public Map<String, Object> producerConfigs() {Map<String, Object> props = Maps.newHashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.22:9092");props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());} }

?

3.編寫消費者的配置:kafkaConsumerConfig

package com.example.demo.config;import com.google.common.collect.Maps; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.Map;/*** @author xiaofeng* @version V1.0* @title: KafkaConfiguration.java* @package: com.example.demo.config* @description: kafka消費者配置* @date 2018/4/2 0002 下午 3:42*/ @Configuration @EnableKafka public class KafkaConsumerConfig {@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = Maps.newHashMap();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.22:9092");propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");return propsMap;}@Beanpublic KafkaProperties.Listener listener() {return new KafkaProperties.Listener();}}


以上為kafka生產者和消費者的相關配置,至于各個配置選項我并沒有做詳細的解釋,相關童鞋可以自行查詢。

4.編寫生產者

package com.example.demo.kafka.sender;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /*** @author xiaofeng* @version V1.0* @title: TestKafkaSender.java* @package: com.example.demo.kafka.sender* @description: kafka生產者* @date 2018/4/2 0002 下午 3:31*/ @Component public class TestKafkaSender {@Autowiredprivate KafkaTemplate kafkaTemplate;@Value("${kafka.test.topic}")String testTopic;public void sendTest(String msg){kafkaTemplate.send(testTopic, msg);} }

5.編寫消費者

package com.example.demo.kafka.consumer;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;/*** @author xiaofeng* @version V1.0* @title: FollowHisOrderConsumer.java* @package: com.example.demo.kafka.consumer* @description: kafka消費者* @date 2018/4/2 0002 下午 3:31*/ @Component public class TestKafkaConsumer {Logger logger = LoggerFactory.getLogger(getClass());@KafkaListener(topics = {"${kafka.test.topic}"})public void consumer(String message) {logger.info(" message = " + message);System.out.printf("message=" + message);}}

最后我們就可以開始測試了,編寫單元測試類:

接下來自己去控制臺查看消費者是否已經接收到了相關數據吧!(不知道你有沒有收到,反正我是收到了^_^)

總結

以上是生活随笔為你收集整理的springboot + kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。