日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring5.0 Kafka2.11

發(fā)布時(shí)間:2024/9/16 javascript 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring5.0 Kafka2.11 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

擼了今年阿里、頭條和美團(tuán)的面試,我有一個(gè)重要發(fā)現(xiàn).......>>>

pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.taian</groupId><artifactId>taian-File2DB</artifactId><version>1.0-SNAPSHOT</version><relativePath>../taianFile2DB</relativePath></parent><groupId>org.taian</groupId><artifactId>file2DB-service</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.0.0.RELEASE</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>5.0.0.RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.3</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build></project>

spring-producer.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"><!-- 定義producer的參數(shù) --><bean id="producerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="172.16.247.100:9092" /><entry key="group.id" value="0" /><entry key="retries" value="10" /><entry key="batch.size" value="16384" /><entry key="linger.ms" value="1" /><entry key="buffer.memory" value="33554432" /><entry key="key.serializer"value="org.apache.kafka.common.serialization.IntegerSerializer" /><entry key="value.serializer"value="org.apache.kafka.common.serialization.StringSerializer" /></map></constructor-arg></bean><!-- 創(chuàng)建kafkatemplate需要使用的producerfactory bean --><bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"><constructor-arg><ref bean="producerProperties" /></constructor-arg></bean><!-- 創(chuàng)建kafkatemplate bean,使用的時(shí)候,只需要注入這個(gè)bean,即可使用template的send消息方法 --><bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"><constructor-arg ref="producerFactory" /><constructor-arg name="autoFlush" value="true" /><property name="defaultTopic" value="test1" /></bean> </beans>

KafkaProducerTest.java

package org.taian;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath*:spring/spring-producer.xml") public class KafkaProducerTest {@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;@Testpublic void testTemplateSend() {kafkaTemplate.send("test1", "www.aaaaaa.com");} }

spring-consumer.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"><!-- 定義consumer的參數(shù) --><bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="172.16.247.100:9092" /><entry key="group.id" value="0" /><entry key="enable.auto.commit" value="true" /><entry key="auto.commit.interval.ms" value="1000" /><entry key="session.timeout.ms" value="15000" /><entry key="key.deserializer"value="org.apache.kafka.common.serialization.IntegerDeserializer" /><entry key="value.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer" /></map></constructor-arg></bean><!-- 創(chuàng)建consumerFactory bean --><bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties" /></constructor-arg></bean><!-- 實(shí)際執(zhí)行消息消費(fèi)的類 --><bean id="messageListernerConsumerService" class="org.taian.KafkaConsumerListener" /><!-- 消費(fèi)者容器配置信息 --><bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"><constructor-arg value="test1" /><property name="messageListener" ref="messageListernerConsumerService" /></bean><!-- 創(chuàng)建kafkatemplate bean,使用的時(shí)候,只需要注入這個(gè)bean,即可使用template的send消息方法 --><bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"><constructor-arg ref="consumerFactory" /><constructor-arg ref="containerProperties" /></bean> </beans>

kafkaConsumerTest.java

package org.taian;import org.springframework.beans.BeansException; import org.springframework.context.support.ClassPathXmlApplicationContext;public class KafkaConsumerTest {public static void main(String[] args) {try {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/spring-consumer.xml");context.start();} catch (BeansException e) {e.printStackTrace();}synchronized (KafkaConsumerTest.class) {while (true) {try {KafkaConsumerTest.class.wait();} catch (InterruptedException e) {e.printStackTrace();}}}} }

KafkaConsumerListener.java

package org.taian;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.MessageListener;public class KafkaConsumerListener implements MessageListener<Integer, String> {@Overridepublic void onMessage(ConsumerRecord<Integer, String> record) {System.out.println(record);} }

?

總結(jié)

以上是生活随笔為你收集整理的Spring5.0 Kafka2.11的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。