日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Redis进阶- Redisson分布式锁实现原理及源码解析

發布時間:2025/3/21 数据库 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis进阶- Redisson分布式锁实现原理及源码解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • Pre
  • 用法
  • Redisson分布式鎖實現原理
  • Redisson分布式鎖源碼分析
    • redisson.getLock(lockKey) 的邏輯
    • redissonLock.lock()的邏輯
    • redissonLock.unlock();邏輯
  • 總結


Pre

Redis進階-細說分布式鎖中我們梳理了使用Redis實現分布式鎖的演進過程,并提出了目前最完善的解決方案:Redisson 實現分布式鎖 。

這里我們來分析下Redisson分布式鎖實現原理及源碼解析


用法

使用redisson實現分布式鎖的操作步驟,三部曲

  • 第一步: 獲取鎖 RLock redissonLock = redisson.getLock(lockKey);
  • 第二步: 加鎖,實現鎖續命功能 redissonLock.lock();
  • 第三步:釋放鎖 redissonLock.unlock();

Redisson分布式鎖實現原理

熟悉了基本用法以后,我們來看下Redission實現分布式鎖的原理,再理解了原理之后,后續梳理源碼實現就更加得心應手了。


Redisson分布式鎖源碼分析

流程圖如下

重點主要是依賴lua腳本的原子性,實現加鎖和釋放鎖的功能

redisson.getLock(lockKey) 的邏輯

@Overridepublic RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name);}

實例化RedissonLock,我們看下RedissonLock的構造函數

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();}
  • super(commandExecutor, name); 父類name賦值,后續通過getName()獲取

  • commandExecutor: 執行lua腳本的executor

  • id 是個UUID, 后面被用來當做 和threadId組成 value值,用作判斷加鎖和釋放鎖是否是同一個線程的校驗。

  • internalLockLeaseTime : 取自 Config#lockWatchdogTimeout,默認30秒,這個參數還有另外一個作用,鎖續命的執行周期 internalLockLeaseTime/3 = 10秒


redissonLock.lock()的邏輯

主要是實現加鎖和鎖的續命

redissonLock.lock();

看看都干了啥

@Overridepublic void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}

繼續看 lockInterruptibly

@Overridepublic void lockInterruptibly() throws InterruptedException {lockInterruptibly(-1, null);}

繼續看 lockInterruptibly(-1, null);

@Overridepublic void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {// 獲取當前線程IDlong threadId = Thread.currentThread().getId();// 嘗試獲取鎖的剩余時間 Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquired ttl為空,說明沒有線程持有該鎖,直接返回 讓當前線程加鎖成功 if (ttl == null) {return;}RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);// 死循環 try {while (true) {// 再此嘗試獲取鎖的剩余時間 ,如果為null, 跳出循環ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for message 如果ttl >=0 說明 有其他線程持有該鎖if (ttl >= 0) {// 獲取信號量,嘗試加鎖,設置最大等待市場為ttlgetEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// 如果ttl小于0 (-1 ,-2 ) 說明已經過期,直接獲取getEntry(threadId).getLatch().acquire();}}} finally {unsubscribe(future, threadId);} // get(lockAsync(leaseTime, unit));}

大流程已經梳理完了,我們看下 Long ttl = tryAcquire(leaseTime, unit, threadId);

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId));}

繼續看下

tryAcquireAsync(leaseTime, unit, threadId) private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 剛開始 leaseTime 傳入的是 -1 ,所以走這個分支// 1)嘗試加鎖 待會細看 先把主要的邏輯梳理完RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);// 2) 注冊監聽事件ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();// lock acquiredif (ttlRemaining == null) {// 3)獲取鎖成功的話,給鎖延長過期時間 scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}

繼續看

// 1)嘗試加鎖 待會細看 先把主要的邏輯梳理完RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

看實現

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

lua 腳本

KEYS[1] ---------> getName()
ARGV[1] ---------> internalLockLeaseTime
ARGV[2] ---------> getLockName(threadId) 實現如下

String getLockName(long threadId) {return id + ":" + threadId;}

這個id就是自開始實例化RedissonLock的id ,是個UUID

我們來解釋下這段lua腳本

// 如果 lockKey不存在 ,設置 使用hset設置 lockKey ,field為 uuid:threadId ,value為1 ,并設置過期時間//就是這個命令 //127.0.0.1:6379> hset lockkey uuid:threadId 1//(integer) 1//127.0.0.1:6379> PEXPIRE lockkey internalLockLeaseTime"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 如果 lockKey 存在和 filed 和 當前線程的uuid:threadId相同 key 加1 ,執行多少次 就加多次 設置過期時間 其實就是如下命令//127.0.0.1:6379> HEXISTS lockkey uuid:threadId//(integer) 1//127.0.0.1:6379> PEXPIRE lockkey internalLockLeaseTime"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// 最后返回 lockkey的 pttl "return redis.call('pttl', KEYS[1]);"

那繼續監聽時間中的 scheduleExpirationRenewal(threadId); 邏輯

private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {// 重點是run方法 @Overridepublic void run(Timeout timeout) throws Exception {// 又是lua腳本 判斷是否存在,存在就調用pexpire RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));// 監聽事件中又 調用了自己 scheduleExpirationRenewalfuture.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}}

redissonLock.unlock();邏輯

@Overridepublic void unlock() {Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));if (opStatus == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + Thread.currentThread().getId());}if (opStatus) {cancelExpirationRenewal();}}

重點看 unlockInnerAsync(Thread.currentThread().getId())

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}

又是lua腳本,核心就是 把value減到為0 ,刪除key

KEYS[1] ---------> getName()

KEYS[2] ---------> getChannelName()
ARGV[1] ---------> LockPubSub.unlockMessage
ARGV[2] ---------> internalLockLeaseTime
ARGV[2] ---------> getLockName(threadId)


總結

需要用到續鎖功能時,一要記住不要設置鎖的過期時間,可以設置成-1.

一旦設了時間,RedissonLock就會認為你需要自己控制鎖時間,而放棄執行續鎖邏輯。
查看源碼, 續鎖邏輯需要起定時器。所以要注意這點,并不是所有分布式場景都需要續鎖邏輯的。當我們很難判斷業務邏輯的執行時間時,不妨開啟續鎖。

至此,原理和源碼我們粗略的梳理完了 ,梳理了主要的核心流程,主要是依靠lua腳本,代碼寫的還是非常優秀的,向開源學習!!!

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的Redis进阶- Redisson分布式锁实现原理及源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。