當(dāng)前位置:
首頁(yè) >
前端技术
> javascript
>内容正文
javascript
SpringBoot2.0 整合 Redis集群 ,实现消息队列场景
生活随笔
收集整理的這篇文章主要介紹了
SpringBoot2.0 整合 Redis集群 ,实现消息队列场景
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
本文源碼
GitHub地址:知了一笑
https://github.com/cicadasmile/middle-ware-parent
一、Redis集群簡(jiǎn)介
1、RedisCluster概念
Redis的分布式解決方案,在3.0版本后推出的方案,有效地解決了Redis分布式的需求,當(dāng)一個(gè)服務(wù)宕機(jī)可以快速的切換到另外一個(gè)服務(wù)。redis cluster主要是針對(duì)海量數(shù)據(jù)+高并發(fā)+高可用的場(chǎng)景。
二、與SpringBoot2.0整合
1、核心依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>${spring-boot.version}</version> </dependency> <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis-client.version}</version> </dependency>2、核心配置
spring:# Redis 集群redis:sentinel:# sentinel 配置master: mymasternodes: 192.168.0.127:26379maxTotal: 60minIdle: 10maxWaitMillis: 10000testWhileIdle: truetestOnBorrow: truetestOnReturn: falsetimeBetweenEvictionRunsMillis: 100003、參數(shù)渲染類
@ConfigurationProperties(prefix = "spring.redis.sentinel") public class RedisParam {private String nodes ;private String master ;private Integer maxTotal ;private Integer minIdle ;private Integer maxWaitMillis ;private Integer timeBetweenEvictionRunsMillis ;private boolean testWhileIdle ;private boolean testOnBorrow ;private boolean testOnReturn ;// 省略GET和SET方法 }4、集群配置文件
@Configuration @EnableConfigurationProperties(RedisParam.class) public class RedisPool {@Resourceprivate RedisParam redisParam ;@Bean("jedisSentinelPool")public JedisSentinelPool getRedisPool (){Set<String> sentinels = new HashSet<>();sentinels.addAll(Arrays.asList(redisParam.getNodes().split(",")));GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();poolConfig.setMaxTotal(redisParam.getMaxTotal());poolConfig.setMinIdle(redisParam.getMinIdle());poolConfig.setMaxWaitMillis(redisParam.getMaxWaitMillis());poolConfig.setTestWhileIdle(redisParam.isTestWhileIdle());poolConfig.setTestOnBorrow(redisParam.isTestOnBorrow());poolConfig.setTestOnReturn(redisParam.isTestOnReturn());poolConfig.setTimeBetweenEvictionRunsMillis(redisParam.getTimeBetweenEvictionRunsMillis());JedisSentinelPool redisPool = new JedisSentinelPool(redisParam.getMaster(), sentinels, poolConfig);return redisPool;}@BeanSpringUtil springUtil() {return new SpringUtil();}@BeanRedisListener redisListener() {return new RedisListener();} }5、配置Redis模板類
@Configuration public class RedisConfig {@Beanpublic StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();stringRedisTemplate.setConnectionFactory(factory);return stringRedisTemplate;} }三、模擬隊(duì)列場(chǎng)景案例
生產(chǎn)者消費(fèi)者模式:客戶端監(jiān)聽(tīng)消息隊(duì)列,消息達(dá)到,消費(fèi)者馬上消費(fèi),如果消息隊(duì)列里面沒(méi)有消息,那么消費(fèi)者就繼續(xù)監(jiān)聽(tīng)。基于Redis的LPUSH(BLPUSH)把消息入隊(duì),用 RPOP(BRPOP)獲取消息的模式。
1、加鎖解鎖工具
@Component public class RedisLock {private static String keyPrefix = "RedisLock:";@Resourceprivate JedisSentinelPool jedisSentinelPool;public boolean addLock(String key, long expire) {Jedis jedis = null;try {jedis = jedisSentinelPool.getResource();/** nxxx的值只能取NX或者XX,如果取NX,則只有當(dāng)key不存在是才進(jìn)行set,如果取XX,則只有當(dāng)key已經(jīng)存在時(shí)才進(jìn)行set* expx的值只能取EX或者PX,代表數(shù)據(jù)過(guò)期時(shí)間的單位,EX代表秒,PX代表毫秒。*/String value = jedis.set(keyPrefix + key, "1", "nx", "ex", expire);return value != null;} catch (Exception e){e.printStackTrace();}finally {if (jedis != null) jedis.close();}return false;}public void removeLock(String key) {Jedis jedis = null;try {jedis = jedisSentinelPool.getResource();jedis.del(keyPrefix + key);} finally {if (jedis != null) jedis.close();}} }2、消息消費(fèi)
1)封裝接口
public interface RedisHandler {/*** 隊(duì)列名稱*/String queueName();/*** 隊(duì)列消息內(nèi)容*/String consume (String msgBody); }2)接口實(shí)現(xiàn)
@Component public class LogAListen implements RedisHandler {private static final Logger LOG = LoggerFactory.getLogger(LogAListen.class) ;@Resourceprivate RedisLock redisLock;@Overridepublic String queueName() {return "LogA-key";}@Overridepublic String consume(String msgBody) {// 加鎖,防止消息重復(fù)投遞String lockKey = "lock-order-uuid-A";boolean lock = false;try {lock = redisLock.addLock(lockKey, 60);if (!lock) {return "success";}LOG.info("LogA-key == >>" + msgBody);} catch (Exception e){e.printStackTrace();} finally {if (lock) {redisLock.removeLock(lockKey);}}return "success";} }3、消息監(jiān)聽(tīng)器
public class RedisListener implements InitializingBean {/*** Redis 集群*/@Resourceprivate JedisSentinelPool jedisSentinelPool;private List<RedisHandler> handlers = null;private ExecutorService product = null;private ExecutorService consumer = null;/*** 初始化配置*/@Overridepublic void afterPropertiesSet() {handlers = SpringUtil.getBeans(RedisHandler.class) ;product = new ThreadPoolExecutor(10,15,60 * 3,TimeUnit.SECONDS,new SynchronousQueue<>());consumer = new ThreadPoolExecutor(10,15,60 * 3,TimeUnit.SECONDS,new SynchronousQueue<>());for (RedisHandler redisHandler : handlers){product.execute(() -> {redisTask(redisHandler);});}}/*** 隊(duì)列監(jiān)聽(tīng)*/public void redisTask (RedisHandler redisHandler){Jedis jedis = null ;while (true){try {jedis = jedisSentinelPool.getResource() ;List<String> msgBodyList = jedis.brpop(0, redisHandler.queueName());if (msgBodyList != null && msgBodyList.size()>0){consumer.execute(() -> {redisHandler.consume(msgBodyList.get(1)) ;});}} catch (Exception e){e.printStackTrace();} finally {if (jedis != null) jedis.close();}}} }4、消息生產(chǎn)者
@Service public class RedisServiceImpl implements RedisService {@Resourceprivate JedisSentinelPool jedisSentinelPool;@Overridepublic void saveQueue(String queueKey, String msgBody) {Jedis jedis = null;try {jedis = jedisSentinelPool.getResource();jedis.lpush(queueKey,msgBody) ;} catch (Exception e){e.printStackTrace();} finally {if (jedis != null) jedis.close();}} }5、場(chǎng)景測(cè)試接口
@RestController public class RedisController {@Resourceprivate RedisService redisService ;/*** 隊(duì)列推消息*/@RequestMapping("/saveQueue")public String saveQueue (){MsgBody msgBody = new MsgBody() ;msgBody.setName("LogAModel");msgBody.setDesc("描述");msgBody.setCreateTime(new Date());redisService.saveQueue("LogA-key", JSONObject.toJSONString(msgBody));return "success" ;} }四、源代碼地址
GitHub地址:知了一笑 https://github.com/cicadasmile/middle-ware-parent 碼云地址:知了一笑 https://gitee.com/cicadasmile/middle-ware-parent
總結(jié)
以上是生活随笔為你收集整理的SpringBoot2.0 整合 Redis集群 ,实现消息队列场景的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 一些实用的DBA语句之二(慢慢更新)
- 下一篇: SpringCloud微服务(07):Z