redis之mq实现发布订阅模式
https://github.com/smltq/spring-boot-demo/blob/master/mq-redis
概述
Redis不僅可作為緩存服務(wù)器,還可用作消息隊(duì)列,本示例演示如何使用redis實(shí)現(xiàn)發(fā)布/訂閱消息隊(duì)列。
在Redis中,發(fā)布者沒(méi)有將消息發(fā)送給特定訂閱者的程序。相反,發(fā)布的消息被描述為通道,而不知道(如果有的話)可能有哪些訂閱者。
訂閱者表示對(duì)一個(gè)或多個(gè)主題感興趣,只接收感興趣的消息,而不知道(如果有的話)發(fā)布者是什么。
發(fā)布者和訂閱者的這種解耦可以實(shí)現(xiàn)更大的可伸縮性和更動(dòng)態(tài)的網(wǎng)絡(luò)拓?fù)洹?br /> 代碼實(shí)現(xiàn)
redis實(shí)現(xiàn)mq的存儲(chǔ)方式很多,可以使用list,zset及stream,這些數(shù)據(jù)的存儲(chǔ)結(jié)構(gòu)決定了怎么消費(fèi)問(wèn)題(消息是一次使用、允許多次使用、允許多端消息等),比如使用list,我們可以使用leftPush插入消息,使用rightPop消費(fèi)消息,實(shí)現(xiàn)一條消息一次消息,可以參考與以示例代碼:
@Test
public void testMq() {
for (int i = 0; i < 10; i++) {
redisTemplate.opsForList().leftPush(“task-queue”, “data” + i);
log.info(“插入了一個(gè)新的任務(wù)==>{}”, “data” + i);
}
String taskId = redisTemplate.opsForList().rightPop(“task-queue”).toString();
log.info(“處理成功,清除任務(wù)==>{}”, taskId);
}
1.配置代碼RedisConfig.java
package demo.data.mqRedis.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
}
2.定義消息發(fā)布接口MessagePublisher.java
package demo.data.mqRedis.config;
public interface MessagePublisher {
void publish(String message);
}
3.發(fā)布方實(shí)現(xiàn)RedisMessagePublisher.java
package demo.data.mqRedis.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
/**
-
消息發(fā)布方
*/
public class RedisMessagePublisher implements MessagePublisher {@Autowired
private RedisTemplate<String, Object> redisTemplate;@Autowired
private ChannelTopic topic;public RedisMessagePublisher(
RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
this.redisTemplate = redisTemplate;
this.topic = topic;
}public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
4.消息接收方RedisMessageSubscriber.java
package demo.data.mqRedis.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
-
消息訂閱方
*/
@Service
@Slf4j
public class RedisMessageSubscriber implements MessageListener {public static List messageList = new ArrayList<>();
public void onMessage(Message message, byte[] pattern) {
messageList.add(message.toString());
log.info(“訂閱方接收到了消息==>{}”, message.toString());
}
}
5.最后貼上application.yml配置
spring:
redis:
host: 127.0.0.1
port: 6379
password:
查看運(yùn)行結(jié)果
1.編寫測(cè)試用例試發(fā)布消息TestRedisMQ.java
package demo.data.mqRedis;
import demo.data.mqRedis.config.RedisMessagePublisher;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class TestRedisMQ {
}
2.運(yùn)行結(jié)果
2019-09-05 15:51:33.931 INFO 10772 — [ container-2] d.d.m.config.RedisMessageSubscriber : 訂閱方接收到了消息==>“Message c95959bf-6c30-4801-bc80-0e1e3c9f81bc”
訂閱方成功接收到消息了
總結(jié)
以上是生活随笔為你收集整理的redis之mq实现发布订阅模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Redis 与 MQ 的区别
- 下一篇: 解决 org.hibernate.Hib