日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

redisson 看门狗_Redisson的分布式锁

發(fā)布時(shí)間:2023/12/18 数据库 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redisson 看门狗_Redisson的分布式锁 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

最近想使用redisson的分布式鎖去替換系統(tǒng)中的redis分布式鎖從而解決續(xù)期問(wèn)題,查看了源碼,發(fā)現(xiàn)其原理還是比較容易理解的。

一、Maven配置

<dependency> <groupId>org.redissongroupId> <artifactId>redissonartifactId> <version>3.13.4version> dependency>

二、Springboot定義配置類

@Configurationpublic class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String password; @Bean public RedissonClient redissonClient() { Config config = new Config(); // config.useClusterServers().addNodeAddress("redis://" + host + ":" + port); // 分片集群方式 SingleServerConfig server = config.useSingleServer(); config.setLockWatchdogTimeout(5 * 1000L); server.setAddress("redis://" + host + ":" + port); server.setPassword(password); RedissonClient redissonClient = Redisson.create(config); return redissonClient; }}

三、API

RedissionClient交互于Redis和Java。其常用的實(shí)現(xiàn)類為Redisson,上鎖/解鎖操作的API也很簡(jiǎn)單:

RLock lock = redissonClient.getLock("鎖的key");lock.lock();lock.unLock();

四、源碼解讀

redissionClient.getLock(“鎖的名稱”);本質(zhì)上是創(chuàng)建了一個(gè)RLock。

RLock lock =new RedissonLock(connectionManager.getCommandExecutor(), name);

1、加鎖RLock.lock

底層進(jìn)入lock(鎖的有效時(shí)間,時(shí)間單位,是否中斷)方法

/** * * @param leaseTime 鎖的有效期 * @param unit 時(shí)間單位 * @param interruptibly 中斷標(biāo)識(shí)位 * @throws InterruptedException */private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); // 嘗試獲取鎖的邏輯,返回值為表明redis中此鎖還剩余的有效時(shí)長(zhǎng) Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // 如果鎖的有效時(shí)間為空,證明上鎖成功 if (ttl == null) { return; } RFuture future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } /** * 自旋的方式重試獲取鎖 */ try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // 如果鎖的有效時(shí)間為空,證明上鎖成功 if (ttl == null) { break; } // future.getNow().getLatch() 底層返回一個(gè)信號(hào)量Semaphore // 在ttl的時(shí)間內(nèi)去嘗試獲取許可 // 獲取不到則阻塞等待信號(hào)量的釋放或者ttl之后再去執(zhí)行下面代碼===> sleep(ttl) if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); }}

那我們看看核心方法tryAcquire是干什么的

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}/** * * @param waitTime 等待時(shí)長(zhǎng) * @param leaseTime 鎖的時(shí)長(zhǎng) * @param unit 時(shí)間單位 * @param threadId 線程ID * @param * @return */private RFuturetryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { // 如果參數(shù)中設(shè)置了鎖的時(shí)長(zhǎng)則直接通過(guò)lua腳本去嘗試創(chuàng)建redis中的節(jié)點(diǎn),并設(shè)置時(shí)長(zhǎng) return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 未設(shè)置時(shí)長(zhǎng)的情況下,使用看門狗配置的時(shí)長(zhǎng);lua腳本設(shè)置redis節(jié)點(diǎn) RFuture ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // 返回值為空,表明設(shè)置成功,則使用看門狗機(jī)制為鎖續(xù)期 if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture;}

分布式鎖獲取的lua腳本

RFuturetryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

????續(xù)期

/** * 定時(shí)續(xù)期操作 * @param threadId */private void scheduleExpirationRenewal(long threadId) { // 新建包裝續(xù)期操作的任務(wù)實(shí)體 ExpirationEntry entry = new ExpirationEntry(); // 放入實(shí)體map ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { // 設(shè)置線程ID oldEntry.addThreadId(threadId); } else { // 設(shè)置線程ID entry.addThreadId(threadId); // 續(xù)期 renewExpiration(); }}/** * 續(xù)期 */private void renewExpiration() { // 從任務(wù)實(shí)體map中獲取本次任務(wù)實(shí)體 ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 封裝本次任務(wù), 定時(shí)為看門狗配置時(shí)間/3 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 判斷鎖是否還在 RFuture future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // 重新激活任務(wù) renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 任務(wù)實(shí)體設(shè)置任務(wù) ee.setTimeout(task);}

判斷鎖是否還在的lua腳本

protected RFuturerenewExpirationAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

2、解鎖RLock.unLock

/** * 同步解鎖 * @param threadId 線程ID * @return */public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); // lua腳本刪除redis中的節(jié)點(diǎn) RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 取消任務(wù)的續(xù)期 cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result;}

lua腳本刪除redis中的節(jié)點(diǎn)

protected RFutureunlockInnerAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }

取消任務(wù)續(xù)期

void cancelExpirationRenewal(Long threadId) { // 獲取任務(wù)實(shí)體 ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } if (threadId != null) { // 任務(wù)取消線程的綁定 task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { // 取消任務(wù) timeout.cancel(); } // 移除任務(wù)實(shí)體 EXPIRATION_RENEWAL_MAP.remove(getEntryName()); }}

五、流程圖

加鎖原理

解鎖原理

六、思考

Redisson在獲取不到鎖的情況下,默認(rèn)是一直阻塞自旋的,如果業(yè)務(wù)中不想一直等待,該如何處理呢?

其實(shí)很簡(jiǎn)單,我們只要通過(guò)反射調(diào)用它嘗試獲取鎖的方法,從而規(guī)避自旋部分即可。

/** * 返回獲取鎖的狀態(tài),true表示上鎖成功 * * @param lockKey * @return */public boolean lockBackState(String lockKey) { try { RLock lock = redissonClient.getLock(lockKey); RedissonLock l = (RedissonLock) lock; Method method = l.getClass().getDeclaredMethod("tryAcquire", long.class, long.class, TimeUnit.class, long.class); method.setAccessible(true); Object invoke = method.invoke(l, -1L, -1L, null, Thread.currentThread().getId()); return invoke == null; } catch (Exception e) { e.printStackTrace(); return false; }}

歡迎大家和帝都的雁積極互動(dòng),頭腦交流會(huì)比個(gè)人埋頭苦學(xué)更有效!共勉!

CSDN:https://blog.csdn.net/yxh13521338301

總結(jié)

以上是生活随笔為你收集整理的redisson 看门狗_Redisson的分布式锁的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。