ZooKeeper实战(三):ZooKeeper实现分布式配置中心、分布式锁、Reactive响应式模型
引入
ZooKeeper是做分布式協調的,那它協調啥?
配置寫在哪里?難道運維人員要登錄到每一臺機器一臺一臺地改嗎?
可以將配置文件放在一個共享的位置中,比如redis,比如數據庫,比如zk,任何一個地方。
zk具有回調機制,就不需要輪詢了。
分布式鎖的實現,這個zk也可以做,面試常問,雖然可能用不到,但是這道題可以帶出很多知識點。
使用ZooKeeper實現分布式配置中心
思路:我們將所有的配置數據用data配置到zk中去,在客戶端我們既可以get它,也可以watch它。
一旦外界對這個數據進行了修改,這個修改就會引發watch的回調。
1、四臺機器配好 Zookeeper 集群
使用zkServer.sh start啟動四臺Zookeeper
2、IDEA新建項目,目錄結構:
pom.xml 里面配好 zookeeper 依賴
3、代碼
TestConfig.java,主程序測試入口
package com.msb.zookeeper.config;import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test;public class TestConfig {ZooKeeper zk;@Beforepublic void conn() {zk = ZKUtils.getZK();}@Afterpublic void close() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void getConf() {WatchCallBack watchCallBack = new WatchCallBack();watchCallBack.setZk(zk);MyConf myConf = new MyConf();watchCallBack.setMyConf(myConf);watchCallBack.awaitExists();//1,節點不存在//2,節點存在while (true) {if (myConf.getConf().equals("")) {System.out.println("conf diu le ......");watchCallBack.awaitExists();} else {System.out.println("In getConf Test, myConf is:" + myConf.getConf());}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}} }Myconf.java,用來存放配置。
真正的開發中,配置中的內容可能是一個復雜的json,xml等等。
此處用一個String conf字符串代替。
ZKUtils.java,用來配置Zookeeper的Server端服務器地址,以及創建并返回Zookeeper實例
package com.msb.zookeeper.config;import org.apache.zookeeper.ZooKeeper;import java.util.concurrent.CountDownLatch;public class ZKUtils {private static ZooKeeper zk;// 需要在zk中設置 create /testLock create /testLock/AppConf set /testLock/AppConf "hello,conf"// ip:port 后面的 /testLock 是指定的根目錄,后續所有的操作都在以 /testLock 為根目錄的基礎上進行private static String address = "10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181,10.0.0.134:2181/testLock";private static DefaultWatch watch = new DefaultWatch();private static CountDownLatch init = new CountDownLatch(1);public static ZooKeeper getZK() {try {zk = new ZooKeeper(address, 1000, watch);//new ZooKeeper 對象的時候需要使用到 DefaultWatchwatch.setLatch(init);init.await();} catch (Exception e) {e.printStackTrace();}return zk;} }DefaultWatch.java,在 new ZooKeeper() 對象的時候需要用到,作為參數傳入。
package com.msb.zookeeper.config;import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher;import java.util.concurrent.CountDownLatch;public class DefaultWatch implements Watcher {CountDownLatch latch;public void setLatch(CountDownLatch latch) {this.latch = latch;}@Overridepublic void process(WatchedEvent event) {System.out.println(event.toString());switch (event.getState()) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:System.out.println("In DefaultWatch, SyncConnected,連接成功.");latch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;}} }WatchCallBack.java,實現多個接口,既是Watcher,又是Callback。重寫了各個回調函數,是整個Reactor模型的核心
package com.msb.zookeeper.config;import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;import java.util.concurrent.CountDownLatch;public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {ZooKeeper zk;MyConf myConf;CountDownLatch latch = new CountDownLatch(1);public MyConf getMyConf() {return myConf;}public ZooKeeper getZk() {return zk;}public void setMyConf(MyConf myConf) {this.myConf = myConf;}public void setZk(ZooKeeper zk) {this.zk = zk;}public void awaitExists() {System.out.println("awaitExists...");zk.exists("/AppConf", this, this, "ABC");try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("awaitExists finish...");}@Override/*** DataCallback接口實現*/public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {if (data != null) {String str = new String(data);System.out.println("In WatchCallBack, data is: " + str);myConf.setConf(str);latch.countDown();}}@Override/*** StatCallback 接口實現*/public void processResult(int rc, String path, Object ctx, Stat stat) {if (stat != null) {zk.getData("/AppConf", this, this, "sdfs");}}@Override/*** Watcher 接口實現*/public void process(WatchedEvent event) {switch (event.getType()) {case None:break;case NodeCreated://節點被創建事件zk.getData("/AppConf", this, this, "sdfs");break;case NodeDeleted://節點被刪除事件,這要根據業務的容忍性,寫處理方式System.out.println("In WatchCallBack, 節點被刪除了");myConf.setConf("");latch = new CountDownLatch(1);//阻塞break;case NodeDataChanged://數據被更改zk.getData("/AppConf", this, this, "sdfs");break;case NodeChildrenChanged:break;}} }4、運行 & 測試
運行 Test
控制臺輸出,可以看到由于zk.exists("/AppConf", this, this, "ABC");后面加了latch.await();正在阻塞地等待Zookeeper中對應節點的創建。
用客戶端連接Zookeeper
創建指定節點create /testLock/AppConf "foo"
檢測到配置文件節點的產生
修改配置文件的節點數據set /testLock/AppConf "bar"
檢測到節點數據的修改
刪除配置文件節點delete /testLock/AppConf
輸出“conf 丟了”,重新進入阻塞等待狀態…等待節點的創建
附:Zookeeper客戶端常用命令:
ls /列出根目錄所有節點名稱
create /testLock/AppConf "foo"創建節點并設置節點內容
set /testLock/AppConf "bar"更新節點內容
delete /testLock/AppConf刪除節點
使用Zookeeper實現分布式鎖
1、和前面一樣,配好4臺Zookeeper集群
zkServer.sh start 四臺zk都啟動一下
2、目錄結構
3、代碼
用到了前面的兩個:
- ZKUtils.java,獲取zk實例,配置server ip
- DefaultWatch.java,默認的Watcher,在 new ZooKeeper 對象的時候會用到
另外,增加了:
TestLock.java,測試分布式鎖的程序入口
package com.msb.zookeeper.lock;import com.msb.zookeeper.config.ZKUtils; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test;public class TestLock {ZooKeeper zk;@Beforepublic void conn() {zk = ZKUtils.getZK();}@Afterpublic void close() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void lock() {for (int i = 0; i < 10; i++) {//模擬10個線程分布在10臺機器上new Thread() {@Overridepublic void run() {WatchCallBack watchCallBack = new WatchCallBack();watchCallBack.setZk(zk);String threadName = Thread.currentThread().getName();watchCallBack.setThreadName(threadName);//每一個線程://搶鎖watchCallBack.tryLock();//干活System.out.println(threadName + " working...");try {Thread.sleep(100);//模擬干活需要的時間,如果這里不sleep,會在第一個已經釋放了,后面的還沒開始watch,看不到第一個釋放鎖的消息,就斷層了} catch (InterruptedException e) {e.printStackTrace();}//釋放鎖watchCallBack.unLock();}}.start();}while (true) {}} }WatchCallBack.java,分布式鎖的Watcher,里面維護了一個CountDownLatch。同時,繼承了接口實現各種回調方法,所以既是Watcher,也是Callback,在傳參的時候直接傳this即可。
package com.msb.zookeeper.lock;import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {ZooKeeper zk;String threadName;CountDownLatch cc = new CountDownLatch(1);String pathName;public String getPathName() {return pathName;}public void setPathName(String pathName) {this.pathName = pathName;}public String getThreadName() {return threadName;}public void setThreadName(String threadName) {this.threadName = threadName;}public ZooKeeper getZk() {return zk;}public void setZk(ZooKeeper zk) {this.zk = zk;}/*** 通過zk創建順序節點的方式,獲取鎖*/public void tryLock() {try {System.out.println(threadName + " create....");zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "str");cc.await();//等待自己是第一個的時候,才能返回} catch (InterruptedException e) {e.printStackTrace();}}/*** 釋放鎖*/public void unLock() {try {zk.delete(pathName, -1);System.out.println(threadName + ": finish work....");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}@Override//process in interface Watcherpublic void process(WatchedEvent event) {//如果第一臺機器拿到的鎖釋放了,只有第二臺機器收到了回調事件//如果,不是第一臺機器,而是其他的某一臺機器掛了,也能造成他后邊的那臺機器收到這個通知,從而讓他后邊那臺機器跟去watch掛掉這個哥們前邊的機器switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zk.getChildren("/", false, this, "str");//去調用getChildren的回調方法break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}@Override//StringCallbackpublic void processResult(int rc, String path, Object ctx, String name) {if (name != null) {//接收zk幫你創建的順序節點的名稱System.out.println(threadName + " create node : " + name);pathName = name;zk.getChildren("/", false, this, "str");}}@Override//zk.getChildren 的 callbackpublic void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {//能看到自己前邊的所有節點 List<String> children,對前面所有的節點名稱排序Collections.sort(children);int index = children.indexOf(pathName.substring(1));//看自己的位置//判斷自己是否為第一個if (index == 0) {//如果自己是第一個,執行countdownSystem.out.println(threadName + ": I am first....");try {zk.setData("/", threadName.getBytes(), -1);//把自己的鎖信息寫進根目錄的node里面去,你可以用get看到,這個步驟為了拖慢一下速度cc.countDown();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} else {//自己不是第一個,則watch前一個zk.exists("/" + children.get(index - 1), this, this, "sdf");}}@Override// StatCallbackpublic void processResult(int rc, String path, Object ctx, Stat stat) {//偷懶,這里省略沒寫} }4、運行 & 測試
輸出結果如下,可以看到10個線程在創建之后,都拿到了自己的自增目錄名稱,然后按照自己的目錄名稱順序依次進行:拿到鎖,干活,釋放鎖。
WatchedEvent state:SyncConnected type:None path:null In DefaultWatch, SyncConnected,連接成功. Thread-3 create.... Thread-9 create.... Thread-1 create.... Thread-7 create.... Thread-0 create.... Thread-5 create.... Thread-8 create.... Thread-4 create.... Thread-2 create.... Thread-6 create.... Thread-3 create node : /lock0000000071 Thread-9 create node : /lock0000000072 Thread-6 create node : /lock0000000073 Thread-0 create node : /lock0000000074 Thread-5 create node : /lock0000000075 Thread-7 create node : /lock0000000076 Thread-1 create node : /lock0000000077 Thread-4 create node : /lock0000000078 Thread-8 create node : /lock0000000079 Thread-2 create node : /lock0000000080 Thread-3: I am first.... Thread-3 working... Thread-3: finish work.... Thread-9: I am first.... Thread-9 working... Thread-9: finish work.... Thread-6: I am first.... Thread-6 working... Thread-6: finish work.... Thread-0: I am first.... Thread-0 working... Thread-0: finish work.... Thread-5: I am first.... Thread-5 working... Thread-5: finish work.... Thread-7: I am first.... Thread-7 working... Thread-7: finish work.... Thread-1: I am first.... Thread-1 working... Thread-1: finish work.... Thread-4: I am first.... Thread-4 working... Thread-4: finish work.... Thread-8: I am first.... Thread-8 working... Thread-8: finish work.... Thread-2: I am first.... Thread-2 working... Thread-2: finish work....在程序執行的同時,從Zookeeper的Client端不斷獲取目錄內容,看到整個過程大致是這樣的:
當一個線程中的任務執行完之后,顯示調用了zk.delete(pathName, -1);將節點刪除。這樣就能觸發下一個正在watch它的節點所在的線程,去判斷自己是不是在排名的第一個。
- 如果是第一個,開始干活
- 如果不是第一個,watch它的前一個(這種情況出現在前一個節點突然掛了的情況)
總結
以上是生活随笔為你收集整理的ZooKeeper实战(三):ZooKeeper实现分布式配置中心、分布式锁、Reactive响应式模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: leetcode 121. 买卖股票的最
- 下一篇: 响应式web(四):使用Netty作为w