日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

redis之mq实现发布订阅模式

發(fā)布時(shí)間:2025/3/21 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redis之mq实现发布订阅模式 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.


https://github.com/smltq/spring-boot-demo/blob/master/mq-redis

概述
Redis不僅可作為緩存服務(wù)器,還可用作消息隊(duì)列,本示例演示如何使用redis實(shí)現(xiàn)發(fā)布/訂閱消息隊(duì)列。

在Redis中,發(fā)布者沒(méi)有將消息發(fā)送給特定訂閱者的程序。相反,發(fā)布的消息被描述為通道,而不知道(如果有的話)可能有哪些訂閱者。
訂閱者表示對(duì)一個(gè)或多個(gè)主題感興趣,只接收感興趣的消息,而不知道(如果有的話)發(fā)布者是什么。
發(fā)布者和訂閱者的這種解耦可以實(shí)現(xiàn)更大的可伸縮性和更動(dòng)態(tài)的網(wǎng)絡(luò)拓?fù)洹?br /> 代碼實(shí)現(xiàn)
redis實(shí)現(xiàn)mq的存儲(chǔ)方式很多,可以使用list,zset及stream,這些數(shù)據(jù)的存儲(chǔ)結(jié)構(gòu)決定了怎么消費(fèi)問(wèn)題(消息是一次使用、允許多次使用、允許多端消息等),比如使用list,我們可以使用leftPush插入消息,使用rightPop消費(fèi)消息,實(shí)現(xiàn)一條消息一次消息,可以參考與以示例代碼:

@Test
public void testMq() {
for (int i = 0; i < 10; i++) {
redisTemplate.opsForList().leftPush(“task-queue”, “data” + i);
log.info(“插入了一個(gè)新的任務(wù)==>{}”, “data” + i);
}
String taskId = redisTemplate.opsForList().rightPop(“task-queue”).toString();
log.info(“處理成功,清除任務(wù)==>{}”, taskId);
}

1.配置代碼RedisConfig.java

package demo.data.mqRedis.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

@Autowired private RedisTemplate redisTemplate;/*** redisTemplate 序列化使用的jdkSerializeable, 存儲(chǔ)二進(jìn)制字節(jié)碼, 所以自定義序列化類,方便調(diào)試redis** @param redisConnectionFactory* @return*/ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();//使用Jackson2JsonRedisSerializer來(lái)序列化和反序列化redis的value值redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());//使用StringRedisSerializer來(lái)序列化和反序列化redis的keredisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());//開(kāi)啟事務(wù)redisTemplate.setEnableTransactionSupport(true);redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate; }@Bean MessageListenerAdapter messageListener() {return new MessageListenerAdapter(new RedisMessageSubscriber()); }@Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listenerAdapter, topic());return container; }@Bean MessagePublisher redisPublisher() {return new RedisMessagePublisher(redisTemplate, topic()); }@Bean ChannelTopic topic() {return new ChannelTopic("messageQueue"); }

}

2.定義消息發(fā)布接口MessagePublisher.java

package demo.data.mqRedis.config;

public interface MessagePublisher {
void publish(String message);
}

3.發(fā)布方實(shí)現(xiàn)RedisMessagePublisher.java

package demo.data.mqRedis.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;

/**

  • 消息發(fā)布方
    */
    public class RedisMessagePublisher implements MessagePublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private ChannelTopic topic;

    public RedisMessagePublisher(
    RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
    this.redisTemplate = redisTemplate;
    this.topic = topic;
    }

    public void publish(String message) {
    redisTemplate.convertAndSend(topic.getTopic(), message);
    }
    }
    4.消息接收方RedisMessageSubscriber.java
    package demo.data.mqRedis.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

/**

  • 消息訂閱方
    */
    @Service
    @Slf4j
    public class RedisMessageSubscriber implements MessageListener {

    public static List messageList = new ArrayList<>();

    public void onMessage(Message message, byte[] pattern) {
    messageList.add(message.toString());
    log.info(“訂閱方接收到了消息==>{}”, message.toString());
    }
    }

5.最后貼上application.yml配置

spring:
redis:
host: 127.0.0.1
port: 6379
password:

查看運(yùn)行結(jié)果
1.編寫測(cè)試用例試發(fā)布消息TestRedisMQ.java

package demo.data.mqRedis;

import demo.data.mqRedis.config.RedisMessagePublisher;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class TestRedisMQ {

@Autowired RedisMessagePublisher redisMessagePublisher;@Test public void testMq() {String message = "Message " + UUID.randomUUID();redisMessagePublisher.publish(message); }

}

2.運(yùn)行結(jié)果
2019-09-05 15:51:33.931 INFO 10772 — [ container-2] d.d.m.config.RedisMessageSubscriber : 訂閱方接收到了消息==>“Message c95959bf-6c30-4801-bc80-0e1e3c9f81bc”
訂閱方成功接收到消息了

總結(jié)

以上是生活随笔為你收集整理的redis之mq实现发布订阅模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。