kafka 支持发布订阅
概述
一般消息隊(duì)列的是實(shí)現(xiàn)是支持兩種模式的,即點(diǎn)對(duì)點(diǎn),還有一種是topic發(fā)布訂閱者模式,比如ACTIVEMQ。KAFKA也支持這兩種模式,但是實(shí)現(xiàn)的原理不一樣。
KAFKA 的消息被讀取后,并不是馬上刪除,這樣就可以重復(fù)讀取。kafka 正式利用這種特性實(shí)現(xiàn)發(fā)布訂閱者模式。
即在發(fā)布消息的時(shí)候,發(fā)布一個(gè)topic,可以使用配置多個(gè)消費(fèi)者來(lái)消費(fèi),消費(fèi)者使用分組來(lái)實(shí)現(xiàn)。比如一個(gè)topic ,有兩個(gè)分組的消費(fèi)者訂閱。
那么發(fā)布一個(gè)消息的時(shí)候,兩個(gè)分組的消費(fèi)者可以讀取到此條消息。
實(shí)現(xiàn)
配置兩組消費(fèi)者。
分組1
<bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><!-- 配置kafka的broke --><entry key="bootstrap.servers" value="${kafka.brokerurl}"/><!-- 配置組--><entry key="group.id" value="group1"/><entry key="enable.auto.commit" value="true"/><entry key="auto.commit.interval.ms" value="1000"/><entry key="session.timeout.ms" value="30000"/><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/></map></constructor-arg></bean><!-- 創(chuàng)建consumerFactory bean --><bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties"/></constructor-arg></bean>注意 這個(gè)分組的ID 是 group1
分組2
<bean id="consumerProperties2" class="java.util.HashMap"><constructor-arg><map><!-- 配置kafka的broke --><entry key="bootstrap.servers" value="${kafka.brokerurl}"/><!-- 配置組--><entry key="group.id" value="group2"/><entry key="enable.auto.commit" value="true"/><entry key="auto.commit.interval.ms" value="1000"/><entry key="session.timeout.ms" value="30000"/><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/></map></constructor-arg></bean><!-- 創(chuàng)建consumerFactory bean --><bean id="consumerFactory2" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties2"/></constructor-arg></bean>這里配置的分組是2 group2 。
我們使用代碼測(cè)試發(fā)布消息:
IMessageProducer producer= MessageUtil.getProducer();LogEntity ent=new LogEntity();ent.setId("000000001");ent.setIp("192.168.1.1");ent.setAction("test");producer.send("logMessageQueue", ent);return "1";我們?cè)诎l(fā)布一個(gè)消息的時(shí)候,兩個(gè)分組的消費(fèi)者都讀取到了這條消息,因此就實(shí)現(xiàn)了 發(fā)布訂閱者模式。
?
轉(zhuǎn)載于:https://www.cnblogs.com/yg_zhang/p/10194115.html
總結(jié)
以上是生活随笔為你收集整理的kafka 支持发布订阅的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Kali Linux 2019.1 发布
- 下一篇: 网络初识