日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

从Curator实现分布式锁的源码再到羊群效应

發布時間:2023/12/18 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从Curator实现分布式锁的源码再到羊群效应 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、前言

Curator是一款由Java編寫的,操作Zookeeper的客戶端工具,在其內部封裝了分布式鎖、選舉等高級功能。

今天主要是分析其實現分布式鎖的主要原理,有關分布式鎖的一些介紹或其他實現,有興趣的同學可以翻閱以下文章:

我用了上萬字,走了一遍Redis實現分布式鎖的坎坷之路,從單機到主從再到多實例,原來會發生這么多的問題

Redisson可重入與鎖續期源碼分析

在使用Curator獲取分布式鎖時,Curator會在指定的path下創建一個有序的臨時節點,如果該節點是最小的,則代表獲取鎖成功。

接下來,在準備工作中,我們可以觀察是否會創建出一個臨時節點出來。


二、準備工作

首先我們需要搭建一個zookeeper集群,當然你使用單機也行。

在這篇文章面試官:能給我畫個Zookeeper選舉的圖嗎?,介紹了一種使用docker-compose方式快速搭建zk集群的方式。

在pom中引入依賴:

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency>

Curator客戶端的配置項:

/*** @author qcy* @create 2022/01/01 22:59:34*/ @Configuration public class CuratorFrameworkConfig {//zk各節點地址private static final String CONNECT_STRING = "localhost:2181,localhost:2182,localhost:2183";//連接超時時間(單位:毫秒)private static final int CONNECTION_TIME_OUT_MS = 10 * 1000;//會話超時時間(單位:毫秒)private static final int SESSION_TIME_OUT_MS = 30 * 1000;//重試的初始等待時間(單位:毫秒)private static final int BASE_SLEEP_TIME_MS = 2 * 1000;//最大重試次數private static final int MAX_RETRIES = 3;@Beanpublic CuratorFramework getCuratorFramework() {CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_STRING).connectionTimeoutMs(CONNECTION_TIME_OUT_MS).sessionTimeoutMs(SESSION_TIME_OUT_MS).retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES)).build();curatorFramework.start();return curatorFramework;}}

SESSION_TIME_OUT_MS參數則會保證,在某個客戶端獲取到鎖之后突然宕機,zk能在該時間內刪除當前客戶端創建的臨時有序節點。

測試代碼如下:

//臨時節點路徑,qcy是博主名字縮寫哈private static final String LOCK_PATH = "/lockqcy";@ResourceCuratorFramework curatorFramework;public void testCurator() throws Exception {InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, LOCK_PATH);interProcessMutex.acquire();try {//模擬業務耗時Thread.sleep(30 * 1000);} catch (Exception e) {e.printStackTrace();} finally {interProcessMutex.release();}}

當使用接口調用該方法時,在Thread.sleep處打上斷點,進入到zk容器中觀察創建出來的節點。

使用? docker exec -it zk容器名 /bin/bash? 以交互模式進入容器,接著使用? ?./bin/zkCli.sh? 連接到zk的server端。

然后使用? ls path? 查看節點

這三個節點都是持久節點,可以使用? get path? 查看節點的數據結構信息

若一個節點的ephemeralOwner值為0,即該節點的臨時擁有者的會話id為0,則代表該節點為持久節點。

當走到斷點Thread.sleep時,確實發現在lockqcy下創建出來一個臨時節點

到這里嗎,準備工作已經做完了,接下來分析interProcessMutex.acquire與release的流程


三、源碼分析

Curator支持多種類型的鎖,例如

  • InterProcessMutex,可重入鎖排它鎖
  • InterProcessReadWriteLock,讀寫鎖
  • InterProcessSemaphoreMutex,不可重入排它鎖

今天主要是分析InterProcessMutex的加解鎖過程,先看加鎖過程

加鎖

public void acquire() throws Exception {if (!internalLock(-1, null)) {throw new IOException("Lost connection while trying to acquire lock: " + basePath);}}

這里是阻塞式獲取鎖,獲取不到鎖,就一直進行阻塞。所以對于internalLock方法,超時時間設置為-1,時間單位設置成null。

private boolean internalLock(long time, TimeUnit unit) throws Exception {Thread currentThread = Thread.currentThread();//通過能否在map中取到該線程的LockData信息,來判斷該線程是否已經持有鎖LockData lockData = threadData.get(currentThread);if (lockData != null) {//進行可重入,直接返回加鎖成功lockData.lockCount.incrementAndGet();return true;}//進行加鎖String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if (lockPath != null) {//加鎖成功,保存到map中LockData newLockData = new LockData(currentThread, lockPath);threadData.put(currentThread, newLockData);return true;}return false;}

其中threadData是一個map,key線程對象,value為該線程綁定的鎖數據。

LockData中保存了加鎖線程owningThread,重入計數lockCount與加鎖路徑lockPath,例如/lockqcy/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005

private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();private static class LockData {final Thread owningThread;final String lockPath;final AtomicInteger lockCount = new AtomicInteger(1);private LockData(Thread owningThread, String lockPath) {this.owningThread = owningThread;this.lockPath = lockPath;}}

進入到internals.attemptLock方法中

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {//開始時間final long startMillis = System.currentTimeMillis();//將超時時間統一轉化為毫秒單位final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;//節點數據,這里為nullfinal byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;//重試次數int retryCount = 0;//鎖路徑String ourPath = null;//是否獲取到鎖boolean hasTheLock = false;//是否完成boolean isDone = false;while (!isDone) {isDone = true;try {//創建一個臨時有序節點,并返回節點路徑//內部調用client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);ourPath = driver.createsTheLock(client, path, localLockNodeBytes);//依據返回的節點路徑,判斷是否搶到了鎖hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);} catch (KeeperException.NoNodeException e) {//在會話過期時,可能導致driver找不到臨時有序節點,從而拋出NoNodeException//這里就進行重試if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {isDone = false;} else {throw e;}}}//獲取到鎖,則返回節點路徑,供調用方記錄到map中if (hasTheLock) {return ourPath;}return null;}

接下來,將會在internalLockLoop中利用剛才創建出來的臨時有序節點,判斷是否獲取到了鎖。

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {//是否獲取到鎖boolean haveTheLock = false;boolean doDelete = false;try {if (revocable.get() != null) {//當前不會進入這里client.getData().usingWatcher(revocableWatcher).forPath(ourPath);}//一直嘗試獲取鎖while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {//返回basePath(這里是lockqcy)下所有的臨時有序節點,并且按照后綴從小到大排列List<String> children = getSortedChildren();//取出當前線程創建出來的臨時有序節點的名稱,這里就是/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005String sequenceNodeName = ourPath.substring(basePath.length() + 1);//判斷當前節點是否處于排序后的首位,如果處于首位,則代表獲取到了鎖PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//獲取到鎖之后,則終止循環haveTheLock = true;} else {//這里代表沒有獲取到鎖//獲取比當前節點索引小的前一個節點String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized (this) {try {//如果前一個節點不存在,則直接拋出NoNodeException,catch中不進行處理,在下一輪中繼續獲取鎖//如果前一個節點存在,則給它設置一個監聽器,監聽它的釋放事件client.getData().usingWatcher(watcher).forPath(previousSequencePath);if (millisToWait != null) {millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();//判斷是否超時if (millisToWait <= 0) {//獲取鎖超時,刪除剛才創建的臨時有序節點doDelete = true;break;}//沒超時的話,在millisToWait內進行等待wait(millisToWait);} else {//無限期阻塞等待,監聽到前一個節點被刪除時,才會觸發喚醒操作wait();}} catch (KeeperException.NoNodeException e) {//如果前一個節點不存在,則直接拋出NoNodeException,catch中不進行處理,在下一輪中繼續獲取鎖}}}}} catch (Exception e) {ThreadUtils.checkInterrupted(e);doDelete = true;throw e;} finally {if (doDelete) {//刪除剛才創建出來的臨時有序節點deleteOurPath(ourPath);}}return haveTheLock;}

判斷是否獲取到鎖的核心邏輯位于getsTheLock中

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {//獲取當前節點在所有子節點排序后的索引位置int ourIndex = children.indexOf(sequenceNodeName);//判斷當前節點是否處于子節點中validateOurIndex(sequenceNodeName, ourIndex);//InterProcessMutex的構造方法,會將maxLeases初始化為1//ourIndex必須為0,才能使得getsTheLock為true,也就是說,當前節點必須是basePath下的最小節點,才能代表獲取到了鎖boolean getsTheLock = ourIndex < maxLeases;//如果獲取不到鎖,則返回上一個節點的名稱,用作對其設置監聽String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);}static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {if (ourIndex < 0) {//可能會由于連接丟失導致臨時節點被刪除,因此這里屬于保險措施throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);}}

那什么時候,在internalLockLoop處于wait的線程能被喚醒呢?

在internalLockLoop方法中,已經使用

client.getData().usingWatcher(watcher).forPath(previousSequencePath);

給前一個節點設置了監聽器,當該節點被刪除時,將會觸發watcher中的回調

private final Watcher watcher = new Watcher() {//回調方法@Overridepublic void process(WatchedEvent event) {notifyFromWatcher();}};private synchronized void notifyFromWatcher() {//喚醒所以在LockInternals實例上等待的線程notifyAll();}

到這里,基本上已經分析完加鎖的過程了,在這里總結下:

首先創建一個臨時有序節點

如果該節點是basePath下最小節點,則代表獲取到了鎖,存入map中,下次直接進行重入。

如果該節點不是最小節點,則對前一個節點設置監聽,接著進行wait等待。當前一個節點被刪除時,將會通知notify該線程。

解鎖

解鎖的邏輯,就比較簡單了,直接進入release方法中

public void release() throws Exception {Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if (lockData == null) {throw new IllegalMonitorStateException("You do not own the lock: " + basePath);}int newLockCount = lockData.lockCount.decrementAndGet();//直接減少一次重入次數if (newLockCount > 0) {return;}if (newLockCount < 0) {throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);}//到這里代表重入次數為0try {//釋放鎖internals.releaseLock(lockData.lockPath);} finally {//從map中移除threadData.remove(currentThread);}}void releaseLock(String lockPath) throws Exception {revocable.set(null);//內部使用guaranteed,會在后臺不斷嘗試刪除節點deleteOurPath(lockPath);}

重入次數大于0,就減少重入次數。當減為0時,調用zk去刪除節點,這一點和Redisson可重入鎖釋放時一致。


四、羊群效應

在這里談談使用Zookeeper實現分布式鎖場景中的羊群效應

什么是羊群效應

首先,羊群是一種很散亂的組織,漫無目的,缺少管理,一般需要牧羊犬來幫助主人控制羊群。

某個時候,當其中一只羊發現前面有更加美味的草而動起來,就會導致其余的羊一哄而上,根本不管周圍的情況。

所以羊群效應,指的是一個人在進行理性的行為后,導致其余人直接盲從,產生非理性的從眾行為。

而Zookeeper中的羊群效應,則是指一個znode被改變后,觸發了大量本可以被避免的watch通知,造成集群資源的浪費。

獲取不到鎖時的等待演化

sleep一段時間

如果某個線程在獲取鎖失敗后,完全可以sleep一段時間,再嘗試獲取鎖。

但這樣的方式,效率極低。

sleep時間短的話,會頻繁地進行輪詢,浪費資源。

sleep時間長的話,會出現鎖被釋放但仍然獲取不到鎖的尷尬情況。

所以,這里的優化點,在于如何變主動輪詢為異步通知。

watch被鎖住的節點

所有的客戶端要獲取鎖時,只去創建一個同名的node。

當znode存在時,這些客戶端對其設置監聽。當znode被刪除后,通知所有等待鎖的客戶端,接著這些客戶端再次嘗試獲取鎖。

雖然這里使用watch機制來異步通知,可是當客戶端的數量特別多時,會存在性能低點。

當znode被刪除后,在這一瞬間,需要給大量的客戶端發送通知。在此期間,其余提交給zk的正常請求可能會被延遲或者阻塞。

這就產生了羊群效應,一個點的變化(znode被刪除),造成了全面的影響(通知大量的客戶端)。

所以,這里的優化點,在于如何減少對一個znode的監聽數量,最好的情況是只有一個。

watch前一個有序節點

如果先指定一個basePath,想要獲取鎖的客戶端,直接在該路徑下創建臨時有序節點。

當創建的節點是最小節點時,代表獲取到了鎖。如果不是最小的節點,則只對前一個節點設置監聽器,只監聽前一個節點的刪除行為。

這樣前一個節點被刪除時,只會給下一個節點代表的客戶端發送通知,不會給所有客戶端發送通知,從而避免了羊群效應。

在避免羊群效應的同時,使得當前鎖成為公平鎖。即按照申請鎖的先后順序獲得鎖,避免存在饑餓過度的線程。


五、后語

本文從源碼角度講解了使用Curator獲取分布式鎖的流程,接著從等待鎖的演化過程角度出發,分析了Zookeeper在分布式鎖場景下避免羊群效應的解決方案。

這是Zookeeper系列的第二篇,關于其watch原理分析、zab協議等文章也在安排的路上了。

總結

以上是生活随笔為你收集整理的从Curator实现分布式锁的源码再到羊群效应的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。