日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

年轻人,看看Redisson分布式锁—可重入锁吧!太重要了

發布時間:2024/1/18 数据库 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 年轻人,看看Redisson分布式锁—可重入锁吧!太重要了 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.引言

作為后端開發,對于所謂的線程安全、高并發等一系列名詞肯定都不會陌生,相關的一些概念及技術框架是面試中的寵兒,也是工作中解決一些特定場景下的技術問題的銀彈。今天我們就來聊聊這些銀彈中的其中一枚——分布式鎖,更確切的說是分布式鎖的其中一種輪子:Redisson 的可重入鎖——基于 redis 實現的分布式鎖。

俗話說得好:面試造火箭,工作擰螺絲(手動狗頭)。分布式鎖大家應該也都不陌生,在解決譬如分布式服務下庫存超賣等類似的場景下很常見,大家也應該都學習和使用過相關的框架,或者自己實現過類似的分布式鎖。但一個完備的分布式鎖實現應該考慮哪些問題,如何優雅且全面的實現一個分布式鎖,以及這些實現背后的原理,這些可能才是我們需要考慮的。就我而言,自己造不了火箭,但學習一下火箭是怎么造的還是蠻有意思的,說不定哪天螺絲擰多了需要自己造火箭呢。退一步講,火箭造出來的過程也是一個個螺絲擰出來的嘛。只要屠龍刀在手,一刀 999,現在能殺雞,將來也能宰牛, skr skr ~

2.分布式鎖概覽

2.1 鎖是干嘛的?

談到鎖,第一印象就是 Java 本身支持的一些加鎖,不管是是 synchronized 還是并發包下 Lock 的一些實現如 ReentrantLock,更甚一些無鎖機制譬如并發包下的一些原子類如 AtomicInteger,都在不同粒度上保證了線程安全。而所謂的線程實現,歸根到底也就是保證共享區或者共享變量的安全而已,而聊到線程安全,立馬就能聯想到三個特性:

  • 原子性

  • 可見性

  • 有序性

對于 Java 語言本身的一些保證線程安全的實現如 synchronized、Lock、原子類更甚至一些在此基礎上的一些線程安全的集合是如何在不同粒度上保證原子性、可見性、有序性的這里就不拆開了,也不是本篇要討論的。更為重要的,我們回到共享資源這個概念,對于單點下的應用,為什么一提到線程安全就是三大特性,為什么這三個特性可以保證線程安全。我的通俗理解是:

  • 對于共享資源的加鎖或者修改(原子類)是原子的,不可拆分的,那么加鎖或者修改本身就要么成功要么失敗,不會有中間過程的誤判

  • 而可見性是保證共享資源對所有線程可見,這個也沒什么好解釋的,只有對共享資源的任何修改都可感知,才不會在不同線程下決策不同

  • 有序性的前提是多線程,單線程下的指令重排不會改變串行結果,但多線程下的指令重排下對共享區域的修改會相互干擾,所以保證多線程的有序性也是必須的

加鎖是手段,保證共享資源的安全才是目的,單點下 Java 是通過原子性、可見性、有序性來實現的。

2.2 分布式鎖需要考慮什么?

前面我們廢話了一堆,可以看出來鎖的目的:保證共享資源的安全?,F在不考慮單點鎖還是分布式鎖:我們考慮兩個問題:

  • 共享資源在哪里?

  • 是否保證了原子性、有序性、可見性加鎖就一定是完備的

對于第一個問題,在單點情況下,我們可以共享資源是一個實例變量或者是一段對資源進行操作的代碼,這些資源在不同線程下共享,而這里的線程都是在一個 JVM 進程中。那么如果是分布式系統呢?舉個例子:某個商品的總庫存、某個優惠券批次的總數量,這些是共享資源嗎?當然是,只是這里共享這些資源的對象從一個 JVM 進程下的多個線程變成了多個服務節點下的多個 JVM 進程下的多個線程而已。下面通過兩張圖我們可以對比一下:

1.單進程下共享資源

2.分布式系統下共享資源

可以看出來,在單個 JVM 進程中,共享資源只是在同一進程下的不同線程共享,不管共享資源是實例變量、代碼段、或者數據庫的資源,所以我們可以通過單點下的原子性、有序性、可見性來保證共享資源的安全。

而在分布式系統下,共享資源的范圍就擴大到了多臺機器下的多個進程中的多個線程中。那么再看一下第二個問題,在分布式系統下原子性、有序性、可見性還管用嗎?或者說這三個特性在分布式系統下還有用嗎?我的理解是:這三個特性是依然存在的,只是針對的對象和范圍發生了變化。在單點情況下,任何共享資源都共存于同一個 JVM 進程中,共享資源狀態同步的范圍也只是在線程的工作內存和主內存之間而已,或者說共享資源的最終狀態在主內存,而其變化狀態發生在單點下的多線程的各自工作內存中,這三個特性所在的容器也只是單個 JVM 進程而已。而分布式系統下,共享資源的狀態同步范圍擴大了多臺機器各自的進程(更細致一點是各個進程中不同的線程之間),共享資源的最終狀態最終一定要依賴于 JVM 進程外的第三方,比如數據庫、任意形式的服務器等等,而共享資源的狀態變化發生在多個進程下的多個線程,因此分布式下的共享資源的安全保證,不僅僅是在線程之間,也在進程之間。

2.3 分布式鎖要提供的最基礎的能力

當然,前面一段理解可能有點過于冗繁,也可以說:分布式系統下整個服務集群是一個大容器,狀態的同步范圍在集群服務所有的線程之間,只是這些線程的交互不再只是通過單機的緩存一致性協議(如 MESI 協議等),而是擴大到了端到端的通信即網絡交互,而共享資源的直接宿主也在第三方譬如其他服務、數據庫等。那這時候這三個特性的范圍如果也相應的擴大到集群線程之間,那共享資源的安全自然也是能夠保證的。當然,這么說可能不太嚴謹,因為我也沒在相關的資料上看到過有人在分布式系統之間使用原子性、有序性、可見性來說明分布式系統的多線程安全,這是只是借鑒思想,大家如果感覺名詞不夠專業,輕噴。

前面簡單討論了分布式系統下的共享資源以及保證線程安全的三個特性,我們考慮一下如何才能在分布式系統這個大容器下保證這三個特性,或者說如何在分布式系統下加鎖?首先,鎖的共享范圍必然是要和要保護的資源一致的,在單點下共享資源就在單個 JVM 進程中,那么鎖依靠 JVM 中的一些手段也就足夠了,比如 synchronized、Lock、原子類(當然這個是無鎖的)等。而在分布式系統下,鎖的生存范圍必然是和集群節點平級的,要不然各個節點各自用自己的鎖,大家對于對方的鎖根本不認識也無法交流那豈不是亂掉了。所以分布式鎖必須獨立于各個節點之外,比如借助 redis、zookeeper 等實現,當然,本篇我們討論的是 redis 的分布式鎖,但我認為前面的思想是通用的,哪怕不用 redis、zookeeper 也可以實現,只是實現方式、效率等方面有所差異。即分布式鎖最起碼要實現進程間共享(這里的共享是指在不同進程間是一套,而不是說可以同時持有),并且能夠保證共享資源的原子性、有序性、可見性。

這里多說一點,由于分布式鎖宿主在 JVM 進程之間,各個進程加鎖以及同步是通過端到端的進程通信,那么此時分布式系統下的可見性、有序性是自然滿足的。首先可見性很好理解,因為共享資源的獲取本身就是服務與服務間的通信,可見性的粒度也應該在服務,只要共享資源發生改變,任何一個服務都可以查詢到(不要說事務什么的,我覺得這里共享資源的狀態同步應該是在事務的上層來看)。而有序性也是在分布式鎖的前提下,不同服務之間對于共享資源的變更也變成了時間上是串行的,那么也自然滿足的,當然這里會有性能的犧牲。那么原子性呢?我理解這里的原子性是靠分布式鎖的獲取等來保證的,只要加鎖、釋放等是原子的,那么鎖所保護的資源(或操作)對于同級的操作就是一個原子的。

2.4 分布式鎖還要考慮什么?

前面討論了分布式鎖怎么保證共享資源的安全,但是由于分布式鎖宿主在譬如 redis、zookeeper 等中間件中,加鎖、釋放、鎖續期等也是在進程與 redis 之間通信,那么就引出了一些單點加鎖不存在的問題:那就是服務如果宕機了怎么辦?或者加鎖是有時間的,如果時間過了持有鎖的任務還沒有完成怎么辦?這時候看起來就像下圖可能出現的情況

出現這些問題的原因是雖然我們將分布式系統和鎖的宿主看作一個大的通信系統,但其卻是離散的,離散的節點自身可能存活、死亡等,在單個離散節點不存在時,其持有的鎖卻可能仍在另外一個離散節點存在(這里指的是依靠 redis 實現的分布式鎖),那么對于其他節點來說鎖也就永遠無法獲取了。反過來,如果持有鎖的離散的服務節點對于共享資源的操作還沒有完成,Redis 由于鎖的時間到期而釋放鎖,那么其他的服務節點就可以獲取到本不該獲取的鎖了,這時候共享資源必然是不安全的。而這些在單個進程中的鎖不會存在,因為單進程下的鎖、線程、資源都在一個容器即 JVM 進程中,JVM 進程死掉的話這些也就一起死掉了,自然也不會存在之前說的問題。可見,分布式鎖不僅要維護共享資源的安全,還要維護鎖自身在不同進程下的安全問題。

3. redis 分布式鎖的一種實現—— Redisson 的可重入鎖

3.1 如何使用 Redisson 的分布式鎖

寫到這里,我覺得前面的文字鋪墊的太多了,代碼和圖片太少了,但對我個人而言我覺得會使用分布式鎖沒有什么太大的意義,所以我前面還是堅持寫了一些冗繁的廢話。那么,我們先看一下如何最簡單的使用 Redisson 的分布式鎖吧,畢竟 Talk is cheap,show me the code !

@Autowired private RedissonClient redisson;private static final String LOCK_PREFIX = "my:lock:";@GetMapping("redis/lock/{seq}") public String lock(@PathVariable String seq) {RLock lock = redisson.getLock(LOCK_PREFIX + seq);try {boolean lockSuccess = lock.tryLock(5, TimeUnit.SECONDS);if (lockSuccess) {System.out.println("get lock success");} else {System.out.println("get lock fail");}TimeUnit.SECONDS.sleep(15);} catch (InterruptedException e) {e.printStackTrace();return seq + " mission dead";} finally {lock.unlock();}return seq + " mission completed"; }@Bean public RedissonClient redissonClient() {Config config = new Config();// 單點模式config.useSingleServer().setAddress("redis://127.0.0.1:6379");// 集群模式/*config.useClusterServers().addNodeAddress("redis://127.0.0.1:7000").addNodeAddress("redis://127.0.0.1:7001").addNodeAddress("redis://127.0.0.1:7002").addNodeAddress("redis://127.0.0.1:7003").addNodeAddress("redis://127.0.0.1:7004").addNodeAddress("redis://127.0.0.1:7005");*/return Redisson.create(config); }

Redisson 實現的分布式鎖的使用就是這么簡單,這個也沒什么好說的,我們公司的不少服務應該也都有過使用,就我接觸到的有兌換券、優惠券等。下面我們就基于這段簡單的代碼來理解一下 Redisson 的分布式鎖是如何實現的。

3.1 RedissonClient:同 redis 通信的組件

public class Redisson implements RedissonClient {static {RedissonObjectFactory.warmUp();RedissonReference.warmUp();}protected final QueueTransferService queueTransferService = new QueueTransferService();protected final EvictionScheduler evictionScheduler;protected final ConnectionManager connectionManager;protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = PlatformDependent.newConcurrentHashMap();protected final Config config;protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();protected final ConcurrentMap<String, ResponseEntry> responses = PlatformDependent.newConcurrentHashMap();protected Redisson(Config config) {this.config = config;Config configCopy = new Config(config);connectionManager = ConfigSupport.createConnectionManager(configCopy);evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());}public EvictionScheduler getEvictionScheduler() {return evictionScheduler;}public CommandExecutor getCommandExecutor() {return connectionManager.getCommandExecutor();}public ConnectionManager getConnectionManager() {return connectionManager;}/*** Create sync/async Redisson instance with default config** @return Redisson instance*/public static RedissonClient create() {Config config = new Config();config.useSingleServer().setTimeout(1000000).setAddress("redis://127.0.0.1:6379"); // config.useMasterSlaveConnection().setMasterAddress("127.0.0.1:6379").addSlaveAddress("127.0.0.1:6389").addSlaveAddress("127.0.0.1:6399"); // config.useSentinelConnection().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379"); // config.useClusterServers().addNodeAddress("127.0.0.1:7000");return create(config);}/*** Create sync/async Redisson instance with provided config** @param config for Redisson* @return Redisson instance*/public static RedissonClient create(Config config) {Redisson redisson = new Redisson(config);if (config.isReferenceEnabled()) {redisson.enableRedissonReferenceSupport();}return redisson;}@Overridepublic RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name);}// 省略巴拉巴拉 }

不得不說,這段代碼復制粘貼的是有點臭長啊,畢竟 CV 工程師,哈哈??偨Y起來就是一句話:Redisson 類是 RedissonClient 的實現,封裝了一些配置、同 redis 的連接管理、一些定時任務、發布訂閱組件等,另外提供一些獲取 Redisson 基于 Redis 實現的分布式鎖、分布式集合、分布式信號量等接口方法,比如我們的分布式鎖-可重入鎖。

public RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name); }

而這里實際上我們獲取到的只是 Redisson 封裝好的對分布式鎖的抽象的對象而已,并不是真正的就執行加鎖操作了。而加鎖、釋放鎖等就是基于 Redisson 的鎖接口 RLock 來做的,而本文討論的可重入鎖則 RedissonLock 是其中一種實現。

3.2 RedissonLock 是如何加鎖的

下面我們就以 demo 的代碼為入口看一下 RedissonLock 是如何加鎖的:

@Override public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return tryLock(waitTime, -1, unit); }

通過 demo 中使用的 tryLock(long waitTime, TimeUnit unit) 我們可以看出來,真正調用的是下面這個方法:

@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// 加鎖成功 返回null 否則返回的是該鎖將要過期的剩余時間// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;// 未獲取到鎖,且第一次嘗試獲取鎖花費時間超過了預設等待時間,則獲取鎖失敗,不再等待if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);} // return get(tryLockAsync(waitTime, leaseTime, unit)); }

而關于這段代碼呢,我們先忽略其他邏輯,重點看這一行:

Long ttl = tryAcquire(leaseTime, unit, threadId);

這一行第一次嘗試加鎖,接著往下看:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId)); }private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 如果自己設置了鎖釋放時間,則獲取鎖后直接返回,且不會設置定時刷新的邏輯(上層方法沒有設置定時任務),則獲取到鎖后超過設定的事件后自動釋放// 或者在設定時間內手動調用釋放鎖if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {// 未獲取到鎖if (e != null) {return;}// 獲取到鎖,開啟自動延期鎖的定時任務// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture; }

可以看出來對于 leaseTime != -1 的判斷會走兩種方式:真正的加鎖是通過 tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) 這個方法來做的,而當 leaseTime != -1 時,直接返回加鎖結果了,而當 leaseTime = -1 時,在返回加鎖結果之前,會監聽加鎖的結果:如果加鎖成功了還會開啟一個自動延期鎖的定時任務。而這個 leaseTime 指的就是加鎖成功后鎖的默認持有時間。當我們不指定 leaseTime 時,默認的鎖持有時間是 30 秒(這個時間叫作看門狗 - lockWatchdogTimeout),并且每 10 秒(30/3)去確認一下鎖的狀態:如果鎖仍未被釋放,則重新設置鎖的過期時間為 30 秒(當然,持有鎖的服務宕機后在 30 秒后鎖會自動釋放,這個我們后面再說)。而當我們指定 leaseTime 時,我們可以看出來前面的代碼不會走到定時續期鎖的邏輯,這時表示:成功獲取到鎖后,在 leaseTime 后,如果鎖仍沒有被服務主動釋放,鎖將自動過期,而不會管持有鎖的線程有沒有完成對應的操作,相當于在持有所得服務執行了比較耗時的任務且未完成時,這時鎖已經被釋放,這時候自然也是不安全的。上面兩段代碼的流程如下:

從前面的流程圖我們可以看出,RedissonLock.tryLock(long waitTime, long leaseTime, TimeUnit unit) 是對于 waitTime, leaseTime 入參會產生不同的行為,這也是 RedissonLock 嘗試加鎖相對最完整的一個鏈路,其他方法譬如我們直接使用的 tryLock(long waitTime, TimeUnit unit) 也只是復用了其中一個邏輯分支。

3.3 RedissonLock分布式鎖的數據結構與加鎖原理

前面一小節我們看到了 RedissonLock 完整的加鎖鏈路,那么分布式鎖在 Redis 是如何實現的呢?怎么判斷加鎖失敗以及鎖的剩余時間呢?現在我們就來看看這個。

if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 巴拉巴拉 }

通過前面的代碼我們可以看出來真正執行加鎖以及返回加鎖結果是調用了下面的方法:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);// 鎖不存在,加鎖成功,設置hash數據結構鎖: 鎖名 -> 加鎖線程:id -> 加鎖次數(1)// 鎖存在且是本線程的鎖 加鎖次數增加:鎖名 -> 加鎖線程:id -> 加鎖次數+1// 鎖存在且不是本線程的鎖 加鎖失敗 返回鎖剩余過期時間return evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

到了這里,我們才能看到 RedissonLock 的加鎖(僅僅指執行加鎖這一動作)以及鎖在 Redis 中的數據結構的廬山真面目:可以看出來上面是執行了一段 lua 腳本,這段 lua 腳 本是會涉及到多個判斷以及數據修改的,這個時候就可以回到我們說的關于加鎖的原子性問題了。先不看這段加鎖的邏輯,只考慮加鎖過程涉及到多個判斷以及操作時,那么那些動作必須是原子的,要么同時成功要么同時失敗,而 RedissonLock 實現加鎖過程的原子性就是借助了 lua 腳本(鎖延期等也會使用 lua 腳本)。那么我們看一下這段 lua 腳本的含義吧:結合注釋,Redisson 實現的可重入鎖的數據結構使用了 Redis中 的 hash 對象數據類型來實現,其在 Redis 中大概長這個樣子:

從上面這張圖我們可以看出來 Redisson 的分布式鎖在 Redis 中的 hash 數據結構:{鎖名}:{uuid:threadId}:{count},另外對于已經存在的健值對初始化過期時間為 30 秒。結合前面的加鎖流程圖,我們就可以看出來 Redisson 分布式鎖是如何實現加鎖的原子性,以下操作是一個原子操作:
  • 某一個節點下的線程加鎖首先判斷該線程對于的 hash 鍵是否存在

  • 若不存在(鎖未被持有),則將鎖的鍵設置為線程 id 對應的唯一標識,值為 1 (第一次加鎖),返回空表示加鎖成功

  • 鎖存在且對應的是本線程,說明之前加鎖的線程為同一個,則將 hash 值 1 (加鎖次數,可重入),另外將該鎖對應的存活時間重新設置,返回空表示加鎖成功

  • 鎖存在但鍵對應的不是當前線程,說明持有鎖的是其他線程,返回鎖剩余的過期時間表示加鎖失敗

到這里,Redisson 的分布式鎖加鎖的流程以及鎖在 Redis 中的數據結構已經清楚了,這時候我們可以對比一下 Java 自身實現的可重入鎖 ReentrantLock。對于 ReentrantLock,甚至更多的線程安全組件如 Semaphore、CountDownLatch 等,其底層的實現都依賴于 AQS(AbstractQueuedSynchronizer),而 AQS 本身是一個隊列,隊列中的節點 Node 同樣也是封裝了線程的對象,只是 AQS 是本地單節點的,Redis 卻是分布式的可以被任何 JVM 共享。另外 AQS 中還封裝了一個 int 類型的狀態變量 state:

/*** The synchronization state.*/ private volatile int state;

當涉及到具體的實現時,state 有不同的含義,對 ReentrantLock 來說 state 就是可重入鎖的加鎖次數,對 Semaphore 來說 state 就是信號量,對 CountDownLatch 來說就是計數量??梢钥闯鰜? Java 的 AQS 一些抽象和 Redisson 實現的分布式鎖是可以類比的,比如 thread 標識對應的封裝,加鎖次數等。只是 AQS 的實現原子操作一般是基于原子類的 CAS,而 Redisson 實現原子操作是基于 Redis 的 lua 腳本。另外 AQS 實現隊列節點狀態同步是基于隊列本身可以遍歷的特性以及節點中的幾種狀態(這里不再贅述),而 Redisson 不同線程之間阻塞同步是基于發布訂閱(后面會提到)??梢缘贸?#xff1a;本地鎖和分布式鎖很多概念和思想是相似的,甚至其數據結構以及目標都是可類比的,只是分布式鎖對本地鎖的對象、范圍、通信方式基于服務之間通信進行了實現。關于 AQS 的原理這里不再展開,大家可以參考 JDK 的源碼。

3.3 鎖的自動續期

前面我們從 Redisson 加鎖為入口,分析了加鎖的整體流程并詳細看了加鎖時的細節以及數據結構,現在我們看一下 Redisson 分布式鎖是如何自動續期的。前面我們已經提到了當第一次加鎖成功時會開啟自動續期的定時任務,對于的代碼入口即為:

// 獲取到鎖,開啟自動延期鎖的定時任務// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}

繼續往下看,進入如下代碼:

private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {// 表示不是第一次加鎖 則加鎖次數加 1 不會再開啟續期鎖 因為第一次加鎖時調用 scheduleExpirationRenewal(long threadId) 會進入// else 會開啟 renewExpiration()oldEntry.addThreadId(threadId);} else {// 在加鎖時第一次調用 開啟自動續期(定時重設鎖的過期時間)entry.addThreadId(threadId);renewExpiration();} }

ExpirationEntry 封裝了定時任務對應的線程對象,結合注釋這一段也不必展開,我們繼續往下看真正開啟續期的方法 renewExpiration():

private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());// 鎖已經不存在了直接返回if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);// 這里監聽續期 成功后遞歸調用(十秒后再次重復)future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// reschedule itselfrenewExpiration();}});}// 10 秒續期一次(如果還持有鎖) 30000/3}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task); }

可以看出來鎖自動續期的流程為:

  • 若鎖已經不存在了(比如手動釋放了鎖),直接返回

  • 若鎖仍存在,調用 Redis 異步設置鎖的過期時間 renewExpirationAsync(threadId),同時監聽續期結果

  • 若續期成功,則遞歸調用 renewExpiration(),否則異常返回

  • 以上過程每 10 秒重復一次 (internalLockLeaseTime / 3)

  • 然后我們看一下調用 Redis 對鎖進行續期的過程:

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {// 當前線程持有的鎖還存在 重新設置鎖的過期時間(默認 30 秒)// 否則失敗return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId)); }

    這里同樣使用 lua 腳本來執行了一段原子操作:

    • 判斷當前線程對應的鎖是否存在,若存在則重新設置鎖的過期時間(默認為 30 秒),返回 true

    • 否則返回 false

    3.4 鎖的手動釋放

    至此,Redisson 的加鎖、自動續期我們已經討論過了,現在看一下鎖的手動釋放, 其入口為:

    public void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException)e.getCause();} else {throw e;}}}

    接著看底層實現 unlockAsync(final long threadId):

    public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();// 釋放鎖RFuture<Boolean> future = unlockInnerAsync(threadId);// 監聽釋放鎖結果future.onComplete((opStatus, e) -> {// 取消自動續期cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result; }

    可以看出來,釋放鎖會執行以下操作:

    • 調用 Redis 釋放鎖

    • 監聽釋放鎖結果,取消自動續期

    然后看一下真正釋放鎖的操作:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {// 若鎖不存在 返回// 若鎖存在 加鎖次數 -1// 若加鎖次數仍不等于 0 (可重入),重新設置鎖的過期時間,返回// 若加鎖次數減為 0,刪除鎖,同步發布釋放鎖事件,返回return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }

    上面這段 lua 腳本的含義基本與注釋一致,這里不再贅述。至此,鎖會被原子性的釋放。

    3.5 加鎖等待

    討論了加鎖成功、鎖自動續期、鎖釋放后,我們再來看一下加鎖等待。前面加鎖的代碼中,我們可以看到,若制定了加鎖的等待時間 waitTime 時,若鎖已經被占有,加鎖會失敗并返回鎖剩余的過期時間,然后循環嘗試加鎖,對應以下代碼:

    current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 等待鎖釋放 循環獲取鎖while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}

    在上面的 while 循環中,我們可以看出來,每次循環都會調用

    ttl = tryAcquire(waitTime, leaseTime, unit, threadId);

    這里就回到了我們之前分析的加鎖流程,不再贅述。整個加鎖等待流程如下:

    • 如果加鎖成功,返回成功

    • 加鎖失敗,基于發布訂閱(基于 Semaphore )阻塞,收到鎖釋放消息后繼續循環,再次嘗試加鎖

    • 如果整個加鎖嘗試時間超過了 waitTime 后仍然未搶到鎖,返回加鎖失敗

    4.總結

    至此,Redisson 基于 Redis 實現的分布式鎖的可重入鎖 RedissonLock 的大致原理就分析完了。我們分析了分布式系統下保證共享資源安全的一些必要特性,然后針對 Redisson 實現的可重入鎖的加鎖、自動續期、鎖釋放、鎖等待的代碼進行了分析,整個過程有所簡略,只關注了整體流程。更為細節的內容如如何和 Redis 進行通信、配置管理覆蓋、發布訂閱如何實現,感興趣的話大家可以自己探索一下。

    全文完


    以下文章您可能也會感興趣:

    • 簡單說說spring的循環依賴

    • Mysql redo log 漫游

    • 單元測試的實踐之路

    • 可線性化檢查:與 NP 完全問題做斗爭

    • Java 類型系統從入門到放棄

    • Webpack 快速上手(下)

    • Webpack 快速上手(中)

    • Webpack 快速上手(上)

    • Airbnb 的 React Native 之路(下)

    • Airbnb 的 React Native 之路(上)

    • 零基礎玩轉 Serverless

    • iOS 開發:深入理解 Xcode 工程結構(一)

    • 三大報表:財務界的通用語言

    • 四維閱讀法 - 我的高效學習“秘技”

    • 一個創業公司的容器化之路(三) - 容器即未來

    • 一個創業公司的容器化之路(二) - 容器化

    • 一個創業公司的容器化之路(一) - 容器化之前

    • 樂高式微服務化改造(下)

    • 樂高式微服務化改造(上)

    我們正在招聘 Java 工程師,歡迎有興趣的同學投遞簡歷到 rd-hr@xingren.com 。

    總結

    以上是生活随笔為你收集整理的年轻人,看看Redisson分布式锁—可重入锁吧!太重要了的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。