日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Redis-13Redis发布订阅

發(fā)布時間:2025/3/21 数据库 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis-13Redis发布订阅 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • 概述
  • 消息多播
  • PubSub發(fā)布者訂閱者模型
  • 客戶端操作
  • Spring配置發(fā)布訂閱模式
  • pubsub不足之處
  • 代碼

概述

當(dāng)使用銀行卡消費(fèi)的時候,銀行往往會通過微信、短信或郵件通知用戶這筆交易的信
息,這便是一種發(fā)布訂閱模式, 1這里的發(fā)布是交易信息的發(fā)布,訂閱則是各個渠道。這在實際工作中十分常用, Redis 支持這樣的一個模式。

Redis 發(fā)布訂閱(pub/sub)是一種消息通信模式:發(fā)送者(pub)發(fā)送消息,訂閱者(sub)接收消息。觀察者模式就是這個模式的典型應(yīng)用。

Redis 客戶端可以訂閱任意數(shù)量的頻道。

下圖展示了頻道 channel1 , 以及訂閱這個頻道的三個客戶端 —— client2 、 client5 和 client1 之間的關(guān)系:

當(dāng)有新消息通過 PUBLISH 命令發(fā)送給頻道 channel1 時, 這個消息就會被發(fā)送給訂閱它的三個客戶端:


消息多播

消息多播允許生產(chǎn)者生產(chǎn)一次消息,中間件負(fù)責(zé)將消息復(fù)制到多個消息隊列,每個消息隊列由相應(yīng)的消費(fèi)組進(jìn)行消費(fèi)。

它是分布式系統(tǒng)常用的一種解耦方式,用于將多個消費(fèi)組的邏輯進(jìn)行拆分。

支持了消息多播,多個消費(fèi)組的邏輯就可以放到不同的子系統(tǒng)中。

如果是普通的消息隊列,就得將多個不同的消費(fèi)組邏輯串接起來放在一個子系統(tǒng)中,進(jìn)行連續(xù)消費(fèi)。


PubSub發(fā)布者訂閱者模型

為了支持消息多播,Redis單獨使用了一個模塊來支持消息多播,這個模塊的名字叫著 PubSub,也就是 PublisherSubscriber,發(fā)布者訂閱者模型。


客戶端操作

首先來注冊一個訂閱的客戶端 , 這個時候使用 SUBSCRIBE命令 。
比如監(jiān)昕一個叫作 talk 的渠道 , 這個時候我們需要先打開一個客戶端 ,這里記為客戶
端1 ,然后輸入命令

127.0.0.1:6379> SUBSCRIBE talk Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "talk" 3) (integer) 1

這個時候客戶端 1 就會訂閱了一個叫作 talk渠道的消息了

打開另外一個客戶端 ,記為客戶端 2訂閱 talk渠道的消息

127.0.0.1:6379> SUBSCRIBE talk Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "talk" 3) (integer) 1

最后打開另外一個客戶端,發(fā)布消息給這兩個訂閱者

127.0.0.1:6379> PUBLISH talk "redis world !!!" (integer) 2 127.0.0.1:6379>

觀察客戶端 1 和客戶端2 ,就可以發(fā)現(xiàn)已經(jīng)收到了消息 , 井有對應(yīng)的信息打印出來。


Spring配置發(fā)布訂閱模式

首先提供接收消息的類 , 它將實現(xiàn) org.springframework.data.redis.connection.MessageListener 接口, 并實現(xiàn)接口定義的方法 public void onMessage(Message message, byte[] pattern)

package com.artisan.redis.publish;import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate;public class RedisMessageListener implements MessageListener {private RedisTemplate redisTemplate;public RedisTemplate getRedisTemplate() {return redisTemplate;}public void setRedisTemplate(RedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}@Overridepublic void onMessage(Message message, byte[] bytes) {// 獲取消息byte[] body = message.getBody();// 使用值序列化器轉(zhuǎn)換String msgBody = (String) getRedisTemplate().getValueSerializer().deserialize(body);System.out.println("RedisMessageListener:" + msgBody);// 獲取 channelbyte[] channel = message.getChannel();// 使用字符串序列化器轉(zhuǎn)換String channelStr = (String) getRedisTemplate().getStringSerializer().deserialize(channel);System.out.println("RedisMessageListener:" + channelStr);// 渠道名稱轉(zhuǎn)換String bytesStr = new String(bytes);System.out.println("RedisMessageListener:" + bytesStr);}} package com.artisan.redis.publish;import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate;public class RedisMessageListener2 implements MessageListener {private RedisTemplate redisTemplate;public RedisTemplate getRedisTemplate() {return redisTemplate;}public void setRedisTemplate(RedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}@Overridepublic void onMessage(Message message, byte[] bytes) {// 獲取消息byte[] body = message.getBody();// 使用值序列化器轉(zhuǎn)換String msgBody = (String) getRedisTemplate().getValueSerializer().deserialize(body);System.out.println("RedisMessageListener2:" + msgBody);// 獲取 channelbyte[] channel = message.getChannel();// 使用字符串序列化器轉(zhuǎn)換String channelStr = (String) getRedisTemplate().getStringSerializer().deserialize(channel);System.out.println("RedisMessageListener2:" + channelStr);// 渠道名稱轉(zhuǎn)換String bytesStr = new String(bytes);System.out.println("RedisMessageListener2:" + bytesStr);}}

為了在 Spring 中使用這兩個監(jiān)聽類,需要對其進(jìn)行配置。這樣就在 Spring 上下文中定義了監(jiān)昕類。

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><context:property-placeholder location="classpath:redis/redis.properties" /><!--2,注意新版本2.3以后,JedisPoolConfig的property name,不是maxActive而是maxTotal,而且沒有maxWait屬性,建議看一下Jedis源碼或百度。 --><!-- redis連接池配置 --><bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"><!--最大空閑數(shù) --><property name="maxIdle" value="${redis.maxIdle}" /><!--連接池的最大數(shù)據(jù)庫連接數(shù) --><property name="maxTotal" value="${redis.maxTotal}" /><!--最大建立連接等待時間 --><property name="maxWaitMillis" value="${redis.maxWaitMillis}" /><!--逐出連接的最小空閑時間 默認(rèn)1800000毫秒(30分鐘) --><property name="minEvictableIdleTimeMillis" value="${redis.minEvictableIdleTimeMillis}" /><!--每次逐出檢查時 逐出的最大數(shù)目 如果為負(fù)數(shù)就是 : 1/abs(n), 默認(rèn)3 --><property name="numTestsPerEvictionRun" value="${redis.numTestsPerEvictionRun}" /><!--逐出掃描的時間間隔(毫秒) 如果為負(fù)數(shù),則不運(yùn)行逐出線程, 默認(rèn)-1 --><property name="timeBetweenEvictionRunsMillis" value="${redis.timeBetweenEvictionRunsMillis}" /><property name="testOnBorrow" value="true"></property><property name="testOnReturn" value="true"></property><property name="testWhileIdle" value="true"></property></bean><!--redis連接工廠 --><bean id="jedisConnectionFactory"class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"destroy-method="destroy"><property name="poolConfig" ref="jedisPoolConfig"></property><!--IP地址 --><property name="hostName" value="${redis.host.ip}"></property><!--端口號 --><property name="port" value="${redis.port}"></property><!--如果Redis設(shè)置有密碼 --><property name="password" value="${redis.password}" /> <!--客戶端超時時間單位是毫秒 --><property name="timeout" value="${redis.timeout}"></property><property name="usePool" value="true" /><!--<property name="database" value="0" /> --></bean><!-- 鍵值序列化器設(shè)置為String 類型 --><bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/><!-- redis template definition --><bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"p:connection-factory-ref="jedisConnectionFactory"p:keySerializer-ref="stringRedisSerializer"p:valueSerializer-ref="stringRedisSerializer"></bean><!-- 自定義 發(fā)布訂閱監(jiān)聽類 --><bean id="redisMessageListener" class="com.artisan.redis.publish.RedisMessageListener"p:redisTemplate-ref="redisTemplate"/><bean id="redisMessageListener2" class="com.artisan.redis.publish.RedisMessageListener2"p:redisTemplate-ref="redisTemplate"/> <!-- 監(jiān)聽容器 --> <bean id="topicContainer"class="org.springframework.data.redis.listener.RedisMessageListenerContainer"destroy-method="destroy"><!--Redis 連接工廠 --><property name="connectionFactory" ref="jedisConnectionFactory"></property><!-- 連接池,這里只要線程池生存 , 才能繼續(xù)監(jiān)昕 --><property name="taskExecutor"><beanclass="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="3"></property></bean></property><!-- 消息監(jiān)聽 Map --><property name="messageListeners"><map><!--一配置監(jiān)聽者, key-ref 和 bean id 定義一致 --><entry key-ref="redisMessageListener"><!--監(jiān)聽類 --><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="talk" /></bean></entry><entry key-ref="redisMessageListener2"><!--監(jiān)聽類 --><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="talk" /></bean></entry></map></property></bean> </beans>

有了監(jiān)聽類還不能進(jìn)行測試。為了進(jìn)行測試 , 要給一個監(jiān)昕容器 , 在 Spring 中己有類org.springframework.data . redi s. li stener.RedisMessageListenerContainer。它可 以用于監(jiān)聽 Redis的發(fā)布訂閱消息,上面配置的topicContainer就是為了實現(xiàn)這個功能。

這里配置了線程池,這個線程池將會持續(xù)的生存 以等待消息傳入 , 而這里配置了容器用id 為 redisMessageListener 和 redisMessageListener2的 Bean 進(jìn)行對渠道 talk的監(jiān)聽 。當(dāng)消息通過渠道 talk發(fā)送的時候,就會使用 id 為 redisMessageListener和redisMessageListener2 的 Bean 進(jìn)行處理消息。

測試類

package com.artisan.redis.publish;import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.data.redis.core.RedisTemplate;public class PublishSubscribeTest {public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring/spring-redis-publish.xml");RedisTemplate redisTemplate = ctx.getBean(RedisTemplate.class);String channel = "talk";redisTemplate.convertAndSend(channel, "artisan-talk");} }

convertAndSend 方法就是向渠道 talk發(fā)送消息的, 當(dāng)發(fā)送后對應(yīng)的監(jiān)聽者就能監(jiān)聽到消息了。運(yùn)行它,后臺就會打出對應(yīng)的消息:

INFO : org.springframework.context.support.ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@73a8dfcc: startup date [Thu Sep 27 23:55:12 CST 2018]; root of context hierarchy INFO : org.springframework.beans.factory.xml.XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [spring/spring-redis-publish.xml] INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647 RedisMessageListener:artisan-talk RedisMessageListener2:artisan-talk RedisMessageListener2:talk RedisMessageListener:talk RedisMessageListener:talk RedisMessageListener2:talk

客戶端中肯定也有對應(yīng)的輸出,如果打開了客戶端的話


pubsub不足之處

PubSub 的生產(chǎn)者傳遞過來一個消息,Redis 會直接找到相應(yīng)的消費(fèi)者傳遞過去。如果一個消費(fèi)者都沒有,那么消息直接丟棄。

如果開始有三個消費(fèi)者,一個消費(fèi)者突然掛掉了,生產(chǎn)者會繼續(xù)發(fā)送消息,另外兩個消費(fèi)者可以持續(xù)收到消息。但是掛掉的消費(fèi)者重新連上的時候,這斷連期間生產(chǎn)者發(fā)送的消息,對于這個消費(fèi)者來說就是徹底丟失了。

如果 Redis 停機(jī)重啟,PubSub 的消息是不會持久化的,畢竟 Redis 宕機(jī)就相當(dāng)于一個消費(fèi)者都沒有,所有的消息直接被丟棄。

正是因為 PubSub 有這些缺點,它幾乎找不到合適的應(yīng)用場景。Redis5.0 新增了 Stream 數(shù)據(jù)結(jié)構(gòu),這個功能給 Redis 帶來了持久化消息隊列,從此 PubSub 可以消失了。

代碼

代碼托管到了 https://github.com/yangshangwei/redis_learn

《新程序員》:云原生和全面數(shù)字化實踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的Redis-13Redis发布订阅的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。