Zookeeper分布式锁的使用
由于公司引入了dubbo+zookeeper框架,里面不可避免的引入的zookeeper分布式鎖,所以自己大致了解了一下。由于是自己研究,有不正確的地方還請大佬批評指正。
首先先介紹一下自己對zookeeper分布式鎖的理解,之后會引入一版別人的感覺比較好的描述給大家
1.dubbo的微服務后場生產者會暴露接口給前場的消費者。在zookeeper會生成一個相應的節點,比如時候節點名字是/lock。
2.當多個客戶端訪問的時候,會在這個節點下依次生成臨時的順序節點,只有第一個節點可以獲取到鎖,其他節點等待。
3.當第一個節點釋放鎖,這個節點將會被刪除,這個行為會被客戶端監聽,一旦收到這個通知,剩余的客戶端會繼續搶鎖。
4.這種行為導致了羊群效應,所有在等待的客戶端都會被觸發。于是優化方法是每個客戶端應該對剛好在它之前的子節點設置事件監聽。我前面這個節點哥們完了,我再搶鎖,否則說明我前面有很多人,我就得繼續排隊
5.執行業務代碼邏輯
6.完成業務代碼,刪除對應的臨時節點,釋放鎖。
給大家推薦大牛的描述,簡單易懂:10分鐘看懂!基于Zookeeper的分布式鎖
說完zookeeper分布式鎖邏輯上的實現,下面介紹一下Curator,它也提供了對zookeeper分布式鎖實現
先看一下大致流程
public class ZKLockTestTread extends Thread{private Logger logger=LoggerFactory.getLogger(getClass());private final String ZKLOCK_NODE_PATH="/zklock/readfile";public ZKLockTestTread(){}@Overridepublic void run() {AbstractMutexLock lock=null;boolean lockflag=false;try{lock=ZKMutexLockFactory.getZKMutexLock(ZKLOCK_NODE_PATH);//指定zk的方式//lock=ZKMutexLockFactory.getZKMutexLock("127.0.0.1:8080", ZKLOCK_NODE_PATH);//lock.acquire();//爭鎖,無限等待lockflag=lock.acquire(10, TimeUnit.SECONDS);//爭鎖,超時時間10秒。if(lockflag){//獲取到分布式鎖,執行任務logger.info("SUCESS線程【"+Thread.currentThread().getName()+"】獲取到分布式鎖,執行任務");Thread.sleep(1000000000);}else{//未獲取到分布式鎖,不執行任務logger.info("FAILURE線程【"+Thread.currentThread().getName()+"】未獲取到分布式鎖,不執行任務");}} catch (PaasException e) {logger.error("線程【"+Thread.currentThread().getName()+"】獲取分布式鎖出錯:"+e.getMessage(),e);//e.printStackTrace();}catch(Exception e){logger.error("線程【"+Thread.currentThread().getName()+"】運行出錯:"+e.getMessage(),e);//e.printStackTrace();}finally{if(lock!=null&&lockflag){try {lock.release();logger.error("線程【"+Thread.currentThread().getName()+"】釋放分布式鎖OK");} catch (Exception e) {logger.error("線程【"+Thread.currentThread().getName()+"】釋放分布式鎖出錯:"+e.getMessage(),e);//e.printStackTrace();}}}//System.out.println("【"+Thread.currentThread().getName()+"】");} }這里指定了節點的路徑:
AbstractMutexLock lock=ZKMutexLockFactory.getZKMutexLock(ZKLOCK_NODE_PATH);獲得鎖的方式是:
boolean lockflag=lock.acquire(10, TimeUnit.SECONDS);返回值就是是否拿到了鎖,接著我們點進去看一眼,首先是接口
/*** Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call* to {@link #release()}** @param time time to wait* @param unit time unit* @return true if the mutex was acquired, false if not* @throws Exception ZK errors, connection interruptions*/public boolean acquire(long time, TimeUnit unit) throws Exception;意思是說直到他狀態是可用或者超過了超時時間才獲取互斥鎖,需用release()平衡,我們再進一步看一下實現
@Overridepublic boolean acquire(long time, TimeUnit unit) throws Exception{return internalLock(time, unit);}調用了internalLock才真正的調用了zookeeper鎖。多說一句,time的值如果為-1,說明鎖被占用時永久阻塞等待
接著進入internalLock
private boolean internalLock(long time, TimeUnit unit) throws Exception{/*Note on concurrency: a given lockData instancecan be only acted on by a single thread so locking isn't necessary*/Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if ( lockData != null ){// re-enteringlockData.lockCount.incrementAndGet();return true;}String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if ( lockPath != null ){LockData newLockData = new LockData(currentThread, lockPath);threadData.put(currentThread, newLockData);return true;}return false;} attemptLock嘗試去獲得鎖處理了zookeeper鎖 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{final long startMillis = System.currentTimeMillis();final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;int retryCount = 0;String ourPath = null;boolean hasTheLock = false;boolean isDone = false;while ( !isDone ){isDone = true;try{ourPath = driver.createsTheLock(client, path, localLockNodeBytes);hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);}catch ( KeeperException.NoNodeException e ){// gets thrown by StandardLockInternalsDriver when it can't find the lock node// this can happen when the session expires, etc. So, if the retry allows, just try it all againif ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){isDone = false;}else{throw e;}}}if ( hasTheLock ){return ourPath;}return null;}其中internalLockLoop:阻塞等待直到獲得鎖
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception{boolean haveTheLock = false;boolean doDelete = false;try{if ( revocable.get() != null ){client.getData().usingWatcher(revocableWatcher).forPath(ourPath);}while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){List<String> children = getSortedChildren();String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slashPredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if ( predicateResults.getsTheLock() ){haveTheLock = true;}else{String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this){try {// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leakclient.getData().usingWatcher(watcher).forPath(previousSequencePath);if ( millisToWait != null ){millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if ( millisToWait <= 0 ){doDelete = true; // timed out - delete our nodebreak;}wait(millisToWait);}else{wait();}}catch ( KeeperException.NoNodeException e ) {// it has been deleted (i.e. lock released). Try to acquire again}}}}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);doDelete = true;throw e;}finally{if ( doDelete ){deleteOurPath(ourPath);}}return haveTheLock;}?
總結
以上是生活随笔為你收集整理的Zookeeper分布式锁的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 王者荣耀吕布卡多少攻速
- 下一篇: Mybatis传入多参问题