日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

kafka 支持发布订阅

發(fā)布時(shí)間:2025/3/8 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 支持发布订阅 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

概述

一般消息隊(duì)列的是實(shí)現(xiàn)是支持兩種模式的,即點(diǎn)對(duì)點(diǎn),還有一種是topic發(fā)布訂閱者模式,比如ACTIVEMQ。KAFKA也支持這兩種模式,但是實(shí)現(xiàn)的原理不一樣。

KAFKA 的消息被讀取后,并不是馬上刪除,這樣就可以重復(fù)讀取。kafka 正式利用這種特性實(shí)現(xiàn)發(fā)布訂閱者模式。

即在發(fā)布消息的時(shí)候,發(fā)布一個(gè)topic,可以使用配置多個(gè)消費(fèi)者來(lái)消費(fèi),消費(fèi)者使用分組來(lái)實(shí)現(xiàn)。比如一個(gè)topic ,有兩個(gè)分組的消費(fèi)者訂閱。

那么發(fā)布一個(gè)消息的時(shí)候,兩個(gè)分組的消費(fèi)者可以讀取到此條消息。

實(shí)現(xiàn)

配置兩組消費(fèi)者。

分組1

<bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><!-- 配置kafka的broke --><entry key="bootstrap.servers" value="${kafka.brokerurl}"/><!-- 配置組--><entry key="group.id" value="group1"/><entry key="enable.auto.commit" value="true"/><entry key="auto.commit.interval.ms" value="1000"/><entry key="session.timeout.ms" value="30000"/><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/></map></constructor-arg></bean><!-- 創(chuàng)建consumerFactory bean --><bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties"/></constructor-arg></bean>

注意 這個(gè)分組的ID 是 group1

分組2

<bean id="consumerProperties2" class="java.util.HashMap"><constructor-arg><map><!-- 配置kafka的broke --><entry key="bootstrap.servers" value="${kafka.brokerurl}"/><!-- 配置組--><entry key="group.id" value="group2"/><entry key="enable.auto.commit" value="true"/><entry key="auto.commit.interval.ms" value="1000"/><entry key="session.timeout.ms" value="30000"/><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/></map></constructor-arg></bean><!-- 創(chuàng)建consumerFactory bean --><bean id="consumerFactory2" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties2"/></constructor-arg></bean>

這里配置的分組是2 group2 。

我們使用代碼測(cè)試發(fā)布消息:

IMessageProducer producer= MessageUtil.getProducer();LogEntity ent=new LogEntity();ent.setId("000000001");ent.setIp("192.168.1.1");ent.setAction("test");producer.send("logMessageQueue", ent);return "1";

我們?cè)诎l(fā)布一個(gè)消息的時(shí)候,兩個(gè)分組的消費(fèi)者都讀取到了這條消息,因此就實(shí)現(xiàn)了 發(fā)布訂閱者模式。

?

轉(zhuǎn)載于:https://www.cnblogs.com/yg_zhang/p/10194115.html

總結(jié)

以上是生活随笔為你收集整理的kafka 支持发布订阅的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。