日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

从锁的原理到构建分布式锁

發布時間:2025/3/18 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从锁的原理到构建分布式锁 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

最近在工作中遇到一個復雜的業務,必須進行加鎖,使邏輯串行化,但是在負載均衡下java傳統的鎖是沒有意義的,必須使用分布式鎖。分布式鎖的方案在網絡上有許多,這里更想探討這些方案背后的原理。

如何避免競爭條件

兩個或多個進程讀寫某些共享數據,而最后的結果取決于進程運行的精確時序,稱為競爭條件?!冬F代操作系統》

而鎖正是避免競爭條件的解決方案之一。 再列出《現代操作系統》一書中,所成為一個解決競爭條件的好方案的條件:

  • 任何兩個進程不能同時處于其臨界區
  • 不應對CPU的速度和數量做任何假設
  • 臨界區外運行的進程不得阻塞其他進程
  • 不得使進程無限期等待進入臨界區
  • 互斥量

    互斥量是一個可以處于兩態之一的變量:解鎖和加鎖。如,用0表示解鎖,其他值表示加鎖。當一個線程或是進程需要訪問臨界區時,如果互斥量當前是解鎖狀態,即線程可以自由進入臨界區,并更改互斥量的值,當然這個過程需要是原子性的。 如果互斥量已經加鎖,調用線程被阻塞,直到獲得鎖的線程將互斥量重置。 而我們平時使用的java的ReentrantLock也是基于互斥量的原理,使用CAS更新互斥量,更新成功即獲得鎖,否則進入阻塞。 根據互斥量的定義和ReentrantLock的實現原理,這里總結一下互斥量的性質:

  • 互斥量狀態判斷到變更的過程必須是原子性的
  • 當無法獲取互斥量時,阻塞線程
  • 魯棒性
  • 第3點是我加上去的,必須保證擁有魯棒性,即不會無端發生鎖丟失,鎖狀態的變更等異常情況。這里對應的是為了避免違反上述提到的設計避免競爭條件方案的第4個條件不得使進程無限期等待進入臨界區。若然不具備魯棒性,這種情況必定會發生。

    使用redis設計分布式鎖

    根據上面提到的互斥量概念,我們依此來利用redis設計出。

    互斥量狀態判斷到變更的過程必須是原子性的

    這點可以利用redis的一些原子性命令來實現,例如不存在即插入的SETNX命令。 而第二點

    當無法獲取互斥量時,阻塞線程

    如果所實現的僅僅是同一時間只允許一個線程(進程)進入臨界區,那么在SETNX命令失敗馬上返回,即實現tryLock功能。 如果要實現阻塞功能的話,比較困難,因為在多進程情況下,必須跨進程喚醒被阻塞的線程,所以這里利用自旋的方式去多次嘗試獲取鎖,直到超時,為了防止過多的空轉浪費CPU資源,可以在自旋過程中加入sleep操作。 最后一點

    魯棒性

    這個其實是最難實現的,原子性的操作,大多組件都會有提供,但是要提供一個組件的魯棒性,必須要考慮到所有情況,并且給出具體的解決方案。關于使用redis實現分布式鎖這點,可以參考http://www.cnblogs.com/0201zcr/p/5942748.html。

    對于分布式鎖來說,一般可以跨線程調用鎖,即在線程A加鎖,在線程B解鎖,因為分布式鎖覆蓋的范圍更大,鎖的可是進程級。這里要注意是否應該加以限制,因為臨界區外運行的進程不得阻塞其他進程,雖然提到的是進程,但在線程級是否應該支持,需要進行考慮。 線程A加鎖,而不進入臨界區,而創建線程B,由線程B去執行邏輯并且釋放鎖,這樣臨界區外的線程A將會阻塞后續到來的線程C、D。當然這樣并不會違反臨界區外運行的進程不得阻塞其他進程,但是要不要縮小這個范圍,限制到線程,就交由實現者去判斷。 最后貼出我所寫的用Redis實現分布式鎖的代碼

    /*** Created by BingZhong on 2017/7/29.** 基于Redis實現的分布式鎖*/ public final class RedisLockHelper {private static Logger logger = LoggerFactory.getLogger(RedisLockHelper.class);/*** redis操作幫助類,可以是其他封裝了redis操作的類*/private RedisHelper redisHelper;public static final long DEFAULT_TIMEOUT = 30 * 1000;public static final long DEFAULT_SLEEP_TIME = 100;private RedisLockHelper(RedisHelper redisHelper) {this.redisHelper = redisHelper;}public static RedisLockHelper getInstance(RedisHelper redisHelper) {return new RedisLockHelper(redisHelper);}/*** 創建鎖** @param mutex 互斥量* @param timeout 鎖的超時時間* @param sleepTime 線程自旋嘗試獲取鎖時的休眠時間* @param timeUnit 時間單位*/public RedisLock newLock(String mutex, long timeout, long sleepTime, TimeUnit timeUnit) {logger.info("創建分布式鎖,互斥量為{}", mutex);return new RedisLock(mutex, timeout, sleepTime, timeUnit);}public RedisLock newLock(String mutex, long timeout, TimeUnit timeUnit) {return newLock(mutex, timeout, DEFAULT_SLEEP_TIME, timeUnit);}public RedisLock newLock(String mutex) {return newLock(mutex, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);}public class RedisLock {/*** 用于創建redis健值對的鍵,相當于互斥量*/private final String mutex;/*** 鎖過期的絕對時間*/private volatile long lockExpiresTime = 0;/*** 鎖的超時時間*/private final long timeout;/*** 每次循環獲取鎖的休眠時間*/private final long sleepTime;/*** 鎖的線程持有者*/private volatile Thread lockHolder = null;private final ReentrantLock threadLock = new ReentrantLock();public RedisLock(String mutex, long timeout, long sleepTime, TimeUnit timeUnit) {this.mutex = mutex;this.timeout = timeUnit.toMillis(timeout);this.sleepTime = timeUnit.toMillis(sleepTime);}/*** 加鎖,將會一直嘗試獲取鎖,直到超時*/public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException {acquireTimeout = timeUnit.toMillis(acquireTimeout);long acquireTime = acquireTimeout + System.currentTimeMillis();threadLock.tryLock(acquireTimeout, timeUnit);try {while (true) {boolean hasLock = tryLock();if (hasLock) {//獲取鎖成功return true;} else if (acquireTime < System.currentTimeMillis()) {break;}Thread.sleep(sleepTime);}} finally {if (threadLock.isHeldByCurrentThread()) {threadLock.unlock();}}return false;}/*** 嘗試獲取鎖,無論是否獲取到鎖都將直接返回而不會阻塞* 不支持重入鎖*/public boolean tryLock() {if (lockHolder == Thread.currentThread()) {throw new IllegalMonitorStateException("不支持重入鎖");}long currentTime = System.currentTimeMillis();String expires = String.valueOf(timeout + currentTime);//嘗試設置互斥量if (redisHelper.setNx(mutex, expires) > 0) {setLockStatus(expires);return true;} else {String currentLockTime = redisHelper.get(mutex);//檢查鎖是否超時if (Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) < currentTime) {//獲取舊的鎖時間并設置互斥量String oldLockTime = redisHelper.getSet(mutex, expires);//判斷獲取到的舊值是否一致,不一致證明已經有另外的進程(線程)成功獲取到了鎖if (Objects.nonNull(oldLockTime) && Objects.equals(oldLockTime, currentLockTime)) {setLockStatus(expires);return true;}}return false;}}/*** 該鎖是否被鎖住*/public boolean isLock() {String currentLockTime = redisHelper.get(mutex);//存在互斥量且鎖還為過時即鎖住return Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) > System.currentTimeMillis();}public String getMutex() {return mutex;}/*** 解鎖*/public boolean unlock() {//只有鎖的持有線程才能解鎖if (lockHolder == Thread.currentThread()) {//判斷鎖是否超時,沒有超時才將互斥量刪除if (lockExpiresTime > System.currentTimeMillis()) {redisHelper.del(mutex);logger.info("刪除互斥量[{}]", mutex);}lockHolder = null;logger.info("釋放[{}]鎖成功", mutex);return true;} else {throw new IllegalMonitorStateException("沒有獲取到鎖的線程無法執行解鎖操作");}}private void setLockStatus(String expires) {lockExpiresTime = Long.parseLong(expires);lockHolder = Thread.currentThread();logger.info("獲取[{}]鎖成功", mutex);}} }

    總結

    很多功能組件,其實都是基于一些概念來進行設計,對于避免競爭條件的方案,還有信號量,管程等,了解這些概念,才能在需要時設計出滿足自己需求的組件。

    轉載于:https://my.oschina.net/bingzhong/blog/1559876

    總結

    以上是生活随笔為你收集整理的从锁的原理到构建分布式锁的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。