當(dāng)前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
Spring5.0 Kafka2.11
生活随笔
收集整理的這篇文章主要介紹了
Spring5.0 Kafka2.11
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
擼了今年阿里、頭條和美團(tuán)的面試,我有一個(gè)重要發(fā)現(xiàn).......>>>
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.taian</groupId><artifactId>taian-File2DB</artifactId><version>1.0-SNAPSHOT</version><relativePath>../taianFile2DB</relativePath></parent><groupId>org.taian</groupId><artifactId>file2DB-service</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.0.0.RELEASE</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>5.0.0.RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.3</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build></project>spring-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"><!-- 定義producer的參數(shù) --><bean id="producerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="172.16.247.100:9092" /><entry key="group.id" value="0" /><entry key="retries" value="10" /><entry key="batch.size" value="16384" /><entry key="linger.ms" value="1" /><entry key="buffer.memory" value="33554432" /><entry key="key.serializer"value="org.apache.kafka.common.serialization.IntegerSerializer" /><entry key="value.serializer"value="org.apache.kafka.common.serialization.StringSerializer" /></map></constructor-arg></bean><!-- 創(chuàng)建kafkatemplate需要使用的producerfactory bean --><bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"><constructor-arg><ref bean="producerProperties" /></constructor-arg></bean><!-- 創(chuàng)建kafkatemplate bean,使用的時(shí)候,只需要注入這個(gè)bean,即可使用template的send消息方法 --><bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"><constructor-arg ref="producerFactory" /><constructor-arg name="autoFlush" value="true" /><property name="defaultTopic" value="test1" /></bean> </beans>KafkaProducerTest.java
package org.taian;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath*:spring/spring-producer.xml") public class KafkaProducerTest {@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;@Testpublic void testTemplateSend() {kafkaTemplate.send("test1", "www.aaaaaa.com");} }spring-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"><!-- 定義consumer的參數(shù) --><bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="172.16.247.100:9092" /><entry key="group.id" value="0" /><entry key="enable.auto.commit" value="true" /><entry key="auto.commit.interval.ms" value="1000" /><entry key="session.timeout.ms" value="15000" /><entry key="key.deserializer"value="org.apache.kafka.common.serialization.IntegerDeserializer" /><entry key="value.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer" /></map></constructor-arg></bean><!-- 創(chuàng)建consumerFactory bean --><bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties" /></constructor-arg></bean><!-- 實(shí)際執(zhí)行消息消費(fèi)的類 --><bean id="messageListernerConsumerService" class="org.taian.KafkaConsumerListener" /><!-- 消費(fèi)者容器配置信息 --><bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"><constructor-arg value="test1" /><property name="messageListener" ref="messageListernerConsumerService" /></bean><!-- 創(chuàng)建kafkatemplate bean,使用的時(shí)候,只需要注入這個(gè)bean,即可使用template的send消息方法 --><bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"><constructor-arg ref="consumerFactory" /><constructor-arg ref="containerProperties" /></bean> </beans>kafkaConsumerTest.java
package org.taian;import org.springframework.beans.BeansException; import org.springframework.context.support.ClassPathXmlApplicationContext;public class KafkaConsumerTest {public static void main(String[] args) {try {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/spring-consumer.xml");context.start();} catch (BeansException e) {e.printStackTrace();}synchronized (KafkaConsumerTest.class) {while (true) {try {KafkaConsumerTest.class.wait();} catch (InterruptedException e) {e.printStackTrace();}}}} }KafkaConsumerListener.java
package org.taian;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.MessageListener;public class KafkaConsumerListener implements MessageListener<Integer, String> {@Overridepublic void onMessage(ConsumerRecord<Integer, String> record) {System.out.println(record);} }?
總結(jié)
以上是生活随笔為你收集整理的Spring5.0 Kafka2.11的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 定时器注入spring
- 下一篇: Ajax发送formdata数据,Spr