Zookeeper_watch机制核心讲解
生活随笔
收集整理的這篇文章主要介紹了
Zookeeper_watch机制核心讲解
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
zookeeper第一次接觸,API倒是很簡(jiǎn)單,watcher的東西,確實(shí)是有點(diǎn)復(fù)雜,咱們的demo講的也有點(diǎn)亂,今天把這個(gè)重新講一下,我覺(jué)得第一遍不懂得沒(méi)關(guān)系,如果第二遍你們還是不懂的話,你的好好研究了,本來(lái)原生的API就是有點(diǎn)麻煩,沒(méi)辦法,關(guān)于API的調(diào)用就不說(shuō)了,API怎么去使用,你自己去研究,我只能將一個(gè)大概的,今天講一個(gè)比較主要的,zookeeper中,API中,無(wú)論是什么set方法啊,還是一些get方法,還是其他一系列的方法,總有一個(gè)布爾類型watcher,這個(gè)東西很頭疼,我們簡(jiǎn)單看一眼,zookeeper里面有一個(gè)watch事件,watch事件和watcher是兩碼事,watch事件是一次性觸發(fā)的,當(dāng)watch監(jiān)視的數(shù)據(jù)發(fā)生變化的時(shí)候,zookeeper會(huì)主動(dòng)的去通知client端,client端就稱之為watcher,有兩大類,第一大類是咱們主要要關(guān)心的,數(shù)據(jù)一旦發(fā)生變化,zookeeper主動(dòng)去觸發(fā)的一個(gè)事件,事件返回給watcher,client端,數(shù)據(jù)節(jié)點(diǎn)發(fā)生變化,它會(huì)有相應(yīng)的事件產(chǎn)生,比如節(jié)點(diǎn)的創(chuàng)建,NodeCreate,還有NodeDataChanged,節(jié)點(diǎn)的數(shù)據(jù)發(fā)生變更,還有NodeChildChanged,我的節(jié)點(diǎn)的子節(jié)點(diǎn)發(fā)生變更,還有個(gè)叫NodeDeleted,就是把當(dāng)前節(jié)點(diǎn)刪除了,他有這4個(gè)事件,其實(shí)這4個(gè)事件是針對(duì)于你監(jiān)聽(tīng)的,觀察那一個(gè)節(jié)點(diǎn)而言的,他并不代表任何的子節(jié)點(diǎn),這是咱們對(duì)于數(shù)據(jù)發(fā)生變更產(chǎn)生的4種事件,還有一個(gè)是狀態(tài)類型,watcher和另外兩個(gè)角色,leader和follower,Disconnected是連接不上,連接上是SyncConnected,還有認(rèn)證失敗和過(guò)期,這4種類型,那你現(xiàn)在知道他的類型變化了
咱們畫(huà)一個(gè)圖看一下什么叫watcher和watch,好像有點(diǎn)抽象繁瑣,首先這塊是一個(gè)zookeeper,然后這邊是我的一個(gè)client端,這也是一個(gè)client端,有好多個(gè)client端,zookeeper就是一個(gè)集群,里面有3個(gè)Node,有3個(gè)節(jié)點(diǎn),第一個(gè)是Follower,第二個(gè)是leader,第三個(gè)是follower,這個(gè)就稱之為一個(gè)集群,但是他們的視圖都是單一的視圖,單一的view,他們?nèi)咧g的數(shù)據(jù)都是同步的,遵循原子消息廣播,利用了復(fù)制算法,下面我又一個(gè)C1,還有一個(gè)C2,兩個(gè)client端,我的WEB應(yīng)用我就部署了兩份了,假如現(xiàn)在zookeeper上的結(jié)構(gòu),有一個(gè)根節(jié)點(diǎn)就是斜杠/,下面有一個(gè)/zk節(jié)點(diǎn),還有一個(gè)/parent節(jié)點(diǎn),假如我C1這個(gè)節(jié)點(diǎn),想去監(jiān)控parent節(jié)點(diǎn)的話,他們兩個(gè)就建立一個(gè)連接了,相當(dāng)于建立一個(gè)watch連接,對(duì)應(yīng)的C1可以稱之為一個(gè)watcher,你可以簡(jiǎn)單這么去理解,當(dāng)然其實(shí)說(shuō)實(shí)話,你一個(gè)client可以擁有不同的watcher,可以擁有多個(gè)的,你代碼上new了幾個(gè),他就產(chǎn)生了幾個(gè)watcher,一般來(lái)講,一個(gè)節(jié)點(diǎn)就應(yīng)該有一個(gè)watcher,你可以簡(jiǎn)單的理解什么啊,先不說(shuō)太復(fù)雜的,C1監(jiān)聽(tīng)這個(gè)/parent,你可以把C1當(dāng)做一個(gè)watcher,如果C1監(jiān)聽(tīng)這個(gè)/parent,我對(duì)應(yīng)的/zk節(jié)點(diǎn)是不是也可以建立一個(gè)watcher,還可以建立一個(gè)watcher,這是可以的,我當(dāng)前C1這個(gè)節(jié)點(diǎn),咱們的client端就可以產(chǎn)生兩個(gè)watcher,怎么去建立呢,就是new一個(gè)watcher,咱們先不考慮藍(lán)色的那個(gè),咱們?nèi)タ醇t色的,到底是怎么去進(jìn)行監(jiān)控呢,咱們寫(xiě)一段代碼,單獨(dú)的process,單獨(dú)的一個(gè)線程去跟我的zk,watcher的一個(gè)關(guān)系,一旦我的C2對(duì)于/parent這個(gè)節(jié)點(diǎn),發(fā)生了任何的數(shù)據(jù)修改,你只要update,或者delete,只要下面加了一個(gè)孩子節(jié)點(diǎn)了,那么對(duì)應(yīng)的zk數(shù)據(jù)發(fā)生變化了,他就觸發(fā)事件,觸發(fā)完事件之后直接就返回去給了C1的client端,C1的client端就直接收到這個(gè)反饋了,收到反饋到底是什么變更,你自己通過(guò)process里面的代碼,自己if else判斷,判斷這個(gè)事件的狀態(tài)到底是什么,然后你做相應(yīng)的代碼的處理,當(dāng)然有一個(gè)非常重要的問(wèn)題,那什么是watch呢,我對(duì)你進(jìn)行監(jiān)控的動(dòng)作稱之為watch,watcher表示人你可以這么去理解,就是節(jié)點(diǎn)對(duì)應(yīng)的watcher,動(dòng)作就是watch,其實(shí)正常來(lái)講咱們的watch是一次性的,并不是一直在這里監(jiān)聽(tīng),我這邊第一次對(duì)她進(jìn)行修改,然后再次去修改一次,如果你watch了一次的話,就接收到一次,第二次再去修改的話,這邊就搜不到了這個(gè)zookeeper去設(shè)計(jì)的時(shí)候就是這么去做的,watch事件就是一次性的,如果你想一直去監(jiān)聽(tīng)這個(gè)節(jié)點(diǎn)的話,那你就需要重復(fù)的去進(jìn)行watch,watch有兩種方案,觸發(fā)完了下次還要不要監(jiān)聽(tīng),還要,那就寫(xiě)成true,那這里的true指的是誰(shuí)監(jiān)聽(tīng)啊,指的是之前的上次的監(jiān)聽(tīng),還讓他繼續(xù)去監(jiān)聽(tīng),第二種方案是你自己真正的再去new一個(gè),再new一個(gè)watcher,再去new一個(gè)watcher,那可能就是一個(gè)新的watcher,只能是這樣的一個(gè)邏輯,可能說(shuō)起來(lái)還有有點(diǎn)麻煩,今天講完了大概也能理解了吧,就是這個(gè)邏輯,既然是這樣的話,咱們來(lái)看一下代碼,代碼來(lái)說(shuō)明問(wèn)題,代碼寫(xiě)的比較亂,其實(shí)是因?yàn)槭裁窗?
package com.learn.zookeeper.watcher;import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;/*** ZooKeeperWatcher這個(gè)類實(shí)現(xiàn)Watcher接口必須要重寫(xiě)一個(gè)方法* * 我連接上zookeeper了以后* 我會(huì)發(fā)生什么操作* * * @author Leon.Sun**/
public class ZooKeeperWatcher implements Watcher {/** 定義原子變量 *//*** 然后我上面定義了一個(gè)AtomicInteger變量* 實(shí)現(xiàn)這種原子性*/AtomicInteger seq = new AtomicInteger();/** 定義session失效時(shí)間 *//*** 然后我的超時(shí)時(shí)間是10秒*/private static final int SESSION_TIMEOUT = 10000;/** zookeeper服務(wù)器地址 */
// private static final String CONNECTION_ADDR = "192.168.1.121:2181,192.168.1.122:2181,192.168.1.123:2181";/*** 我的服務(wù)器的地址*/private static final String CONNECTION_ADDR = "59.110.138.145:2181";/** zk父路徑設(shè)置 *//*** 當(dāng)前有一個(gè)/p的節(jié)點(diǎn)* */private static final String PARENT_PATH = "/p";/** zk子路徑設(shè)置 *//*** 然后/p下面有一個(gè)/c* 現(xiàn)在就兩個(gè)節(jié)點(diǎn)*/private static final String CHILDREN_PATH = "/p/c";/** 進(jìn)入標(biāo)識(shí) *//*** 這里是一個(gè)靜態(tài)的字符串* 叫main一個(gè)靜態(tài)的字符串*/private static final String LOG_PREFIX_OF_MAIN = "【Main】";/** zk變量 *//*** 這里有一個(gè)zookeeper的實(shí)例* 我在創(chuàng)建zk實(shí)例的時(shí)候* 就new出來(lái)* */private ZooKeeper zk = null;/**用于等待zookeeper連接建立之后 通知阻塞程序繼續(xù)向下執(zhí)行 *//*** 咱們之前也看過(guò)helloworde程序了* 咱們想異步的從client端連接服務(wù)器的時(shí)候* 都是通過(guò)CountDownLatch去做一個(gè)異步的回調(diào)* 程序一開(kāi)始是阻塞的* 連接成功了就使用countdown* 然后讓他繼續(xù)往下走* 他只是一個(gè)這個(gè)邏輯* 這幾個(gè)成員變量簡(jiǎn)單的說(shuō)一下*/private CountDownLatch connectedSemaphore = new CountDownLatch(1);/*** 創(chuàng)建ZK連接* * 這里面有一個(gè)createConnection* 就是創(chuàng)建連接唄* * @param connectAddr ZK服務(wù)器地址列表* @param sessionTimeout Session超時(shí)時(shí)間*/public void createConnection(String connectAddr, int sessionTimeout) {/*** 就是有重復(fù)創(chuàng)建連接的時(shí)候先釋放一下連接* 就是后面的這個(gè)代碼*/this.releaseConnection();try {//this表示把當(dāng)前對(duì)象進(jìn)行傳遞到其中去(也就是在主函數(shù)里實(shí)例化的new ZooKeeperWatcher()實(shí)例對(duì)象)/*** 重新的去new一個(gè)zookeeper* new出一個(gè)zookeeper還是和之前的參數(shù)差不多* ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)* 先連接字符串地址address* 然后是超時(shí)的時(shí)間timeout* 以及你現(xiàn)在要使用的watcher了* 如果發(fā)生變更的話我都是通過(guò)這個(gè)watcher對(duì)象去對(duì)某一個(gè)節(jié)點(diǎn)進(jìn)行監(jiān)控的* 然后做完這件事情之后* * this表示watcher當(dāng)前這個(gè)類* 然后給zookeeper了* * */zk = new ZooKeeper(connectAddr, sessionTimeout, this);/*** 打印了一句話* 這句話放在后面會(huì)好一點(diǎn)* 其實(shí)無(wú)所謂了* 開(kāi)始建立連接* 還是放在這里* * */System.out.println(LOG_PREFIX_OF_MAIN + "開(kāi)始連接ZK服務(wù)器");/*** 然后等著建立連接* 然后調(diào)用countdown方法繼續(xù)往下執(zhí)行* 然后剩余的代碼一會(huì)再說(shuō)吧*/connectedSemaphore.await();} catch (Exception e) {e.printStackTrace();}}/*** 關(guān)閉ZK連接*/public void releaseConnection() {/*** 如果你的zookeeper之前不等于空的話* 那我就直接close掉*/if (this.zk != null) {try {this.zk.close();} catch (InterruptedException e) {e.printStackTrace();}}}/*** 創(chuàng)建節(jié)點(diǎn)* * 無(wú)非就是傳一個(gè)path* 傳一個(gè)數(shù)據(jù)data* * createPath做的事是這樣的* zookeeper開(kāi)始是沒(méi)有/p這個(gè)節(jié)點(diǎn)的* 他可以提前的去監(jiān)控* 這個(gè)節(jié)點(diǎn)還沒(méi)有就可以去監(jiān)控* 當(dāng)你這個(gè)節(jié)點(diǎn)真正的創(chuàng)建的時(shí)候* 創(chuàng)建好了以后* 在這里就收到了* 這個(gè)/p就已經(jīng)create了* 然后收到的事件 狀態(tài)是什么* NodeCreated* 現(xiàn)在是這個(gè)機(jī)制* ls /* 就會(huì)發(fā)現(xiàn)有一個(gè)/p節(jié)點(diǎn)* 然后我這里就會(huì)對(duì)應(yīng)有一個(gè)p節(jié)點(diǎn)* 然后我就直接delete掉* 我再ls /一下* 發(fā)現(xiàn)沒(méi)了* 咱們?cè)僭囈环N情況吧* 我所說(shuō)的那個(gè)意思* 你不watch的話那就沒(méi)有* 如果你寫(xiě)false的話* 我如果寫(xiě)false的話* this.zk.exists這個(gè)方法就 不watch了* 說(shuō)白了就變成false了* 就是你沒(méi)有監(jiān)控我的zookeeper* 也不是我的zookeeper* 這個(gè)watch它是一個(gè)布爾類型的* 要么是true要么是false* 那這個(gè)true表示誰(shuí)啊* 就是我當(dāng)前上下文的環(huán)境* watch那個(gè)人* 到底是不是要監(jiān)控這個(gè)節(jié)點(diǎn)* watch指的是誰(shuí)呢* 說(shuō)白了就是咱們create的時(shí)候* * 或者他在createPath的* needWatch是一個(gè)布爾類型* 但是你可以新建一個(gè)watcher* 你可以new一個(gè)watcher* 新建一個(gè)watcher表示你不使用上下文的一個(gè)watcher了* 你應(yīng)該單獨(dú)再來(lái)一個(gè)watcher* 你new這個(gè)watcher之后* 你還是得實(shí)現(xiàn)process方法* 這個(gè)就是布爾類型的watch* * 就是走這個(gè)create方法* 首先我這個(gè)/p/c是不存在的* * * * @param path 節(jié)點(diǎn)路徑* @param data 數(shù)據(jù)內(nèi)容* @return */public boolean createPath(String path, String data, boolean needWatch) {try {//設(shè)置監(jiān)控(由于zookeeper的監(jiān)控都是一次性的所以 每次必須設(shè)置監(jiān)控)/*** 在這里我這么去監(jiān)控了一下* 我之前是寫(xiě)死的* 我要做一個(gè)exists是否存在* 其實(shí)這個(gè)節(jié)點(diǎn)如果不存在watch也是生效的* 我這塊必須得watch一下* 不watch有什么效果呢* * 我是在創(chuàng)建節(jié)點(diǎn)之前watch的* 我needWatch等于true的* 然后我去創(chuàng)建這個(gè)節(jié)點(diǎn)* 那就是說(shuō)什么事啊* 我第一次去創(chuàng)建這個(gè)節(jié)點(diǎn)之前* 我用watcher去watch了這個(gè)path* path這個(gè)時(shí)候還沒(méi)有呢* 我創(chuàng)建的時(shí)候觸發(fā)了* 給你client推送了消息* 你接收到了* 咱兩的watch就結(jié)束了* 因?yàn)槲乙呀?jīng)觸發(fā)了一次了* 你再次去用的話* 他就不走watch了* * 首先我先讓他watch一下* 然后節(jié)點(diǎn)創(chuàng)建的時(shí)候就觸發(fā)了watch了* 大體上就是這個(gè)意思* 我現(xiàn)在要說(shuō)一個(gè)問(wèn)題是啥啊* 你看咱們正常走的時(shí)候1,2,3,4四個(gè)連接* 有4個(gè)watch* * */this.zk.exists(path, needWatch);/*** 假如創(chuàng)建成功了以后* 打印一句話* 這個(gè)節(jié)點(diǎn)創(chuàng)建成功* 調(diào)用原生API的create方法* * 其實(shí)打印的是這句話* LOG_PREFIX_OF_MAIN前綴是這個(gè)【W(wǎng)atcher-1】* path:/p* path為什么是/p嗎* 之前create返回的就是你創(chuàng)建的一個(gè)路徑* 內(nèi)容是content* 內(nèi)容是對(duì)應(yīng)的value* 為什么要執(zhí)行兩次* 什么時(shí)候執(zhí)行* 什么時(shí)候不執(zhí)行* 你應(yīng)該理解watcher就是一次性的* 第二次不行了* 為什么第二次不行了* 我說(shuō)了兩遍了* node是什么啊* 我一般把p理解為node* 你看API* createPath是我自己封裝的名字* 其實(shí)你可以叫createNode* 一般來(lái)說(shuō)path指的是key* path表示路徑* node表示節(jié)點(diǎn)* 節(jié)點(diǎn)包含key也包含value* 還包含其他的一些值* 我創(chuàng)建一個(gè)節(jié)點(diǎn)叫node* 節(jié)點(diǎn)的一部分叫做path* 你可以理解為他們兩個(gè)是一個(gè)東西* 沒(méi)必要這么去咬文嚼字* * * */System.out.println(LOG_PREFIX_OF_MAIN + "節(jié)點(diǎn)創(chuàng)建成功, Path: " + this.zk.create( /**路徑*/ /*** 節(jié)點(diǎn)的路徑*/path, /*** 節(jié)點(diǎn)路徑對(duì)應(yīng)的內(nèi)容*//**數(shù)據(jù)*/data.getBytes(), /**所有可見(jiàn)*//*** 開(kāi)放式的認(rèn)證*/Ids.OPEN_ACL_UNSAFE, /**永久存儲(chǔ)*//*** 節(jié)點(diǎn)的模式* PERSISTENT的模式*/CreateMode.PERSISTENT ) + /*** 然后把當(dāng)前節(jié)點(diǎn)的數(shù)據(jù)打印一下*/", content: " + data);} catch (Exception e) {e.printStackTrace();return false;}/*** 如果return true的話表示節(jié)點(diǎn)創(chuàng)建成功*/return true;}/*** 讀取指定節(jié)點(diǎn)數(shù)據(jù)內(nèi)容* * 讀方法就是有一個(gè)needWatch* * * @param path 節(jié)點(diǎn)路徑* @return*/public String readData(String path, boolean needWatch) {try {System.out.println("讀取數(shù)據(jù)操作...");/*** getData的時(shí)候就設(shè)置為true了* 設(shè)置成true了* 然后才有的* */return new String(this.zk.getData(path, needWatch, null));} catch (Exception e) {e.printStackTrace();return "";}}/*** 更新指定節(jié)點(diǎn)數(shù)據(jù)內(nèi)容* @param path 節(jié)點(diǎn)路徑* @param data 數(shù)據(jù)內(nèi)容* @return*/public boolean writeData(String path, String data) {try {System.out.println(LOG_PREFIX_OF_MAIN + "更新數(shù)據(jù)成功,path:" + path + ", stat: " +this.zk.setData(path, data.getBytes(), -1));} catch (Exception e) {e.printStackTrace();return false;}return true;}/*** 刪除指定節(jié)點(diǎn)* * @param path* 節(jié)點(diǎn)path*/public void deleteNode(String path) {try {this.zk.delete(path, -1);System.out.println(LOG_PREFIX_OF_MAIN + "刪除節(jié)點(diǎn)成功,path:" + path);} catch (Exception e) {e.printStackTrace();}}/*** 判斷指定節(jié)點(diǎn)是否存在* @param path 節(jié)點(diǎn)路徑*/public Stat exists(String path, boolean needWatch) {try {/*** 需要繼續(xù)watch的*/return this.zk.exists(path, needWatch);} catch (Exception e) {e.printStackTrace();return null;}}/*** 獲取子節(jié)點(diǎn)* * 這個(gè)方法里也寫(xiě)的非常的easy* 其實(shí)一點(diǎn)也沒(méi)變* * * @param path 節(jié)點(diǎn)路徑*/private List<String> getChildren(String path, boolean needWatch) {try {System.out.println("讀取子節(jié)點(diǎn)操作...");/*** 就是和原生API保持一致* 原生API就是這么去寫(xiě)的* getChildren里面?zhèn)饕粋€(gè)path* 就是查這個(gè)節(jié)點(diǎn)下的子節(jié)點(diǎn)有多少個(gè)* getChildren(String path, boolean watch) * 是否需要監(jiān)控子節(jié)點(diǎn)* 昨天其實(shí)我也講了* 你是否要監(jiān)控他的子節(jié)點(diǎn)* 說(shuō)白了你這塊watch* 設(shè)置成false和true的區(qū)別* 這4個(gè)方法都是對(duì)應(yīng)自己的parent* 跟子節(jié)點(diǎn)沒(méi)關(guān)系* 這兒如果寫(xiě)成true了* 子節(jié)點(diǎn)新增的話* 會(huì)觸發(fā)node change這個(gè)事件* 但是是相對(duì)于我這個(gè)parent* 只不過(guò)是我下面加了一個(gè)孩子* 刪除了一個(gè)節(jié)點(diǎn)* 會(huì)觸發(fā)child change* 所以你要理解這個(gè)事情* 現(xiàn)在咱們?nèi)プ鲆幌逻@個(gè)操作* 之前咱們有3次了* 上面講完的都保持不變* */return this.zk.getChildren(path, needWatch);} catch (Exception e) {e.printStackTrace();return null;}}/*** 刪除所有節(jié)點(diǎn)* * 之前是寫(xiě)死的* 但是后來(lái)發(fā)現(xiàn)寫(xiě)死了不太好* 我既然是false的話就不watch了* 先刪除/p/c這個(gè)子節(jié)點(diǎn)之后* 再刪除/p* */public void deleteAllTestPath(boolean needWatch) {/*** CHILDREN_PATH是/p/c*/if(this.exists(CHILDREN_PATH, needWatch) != null){this.deleteNode(CHILDREN_PATH);}/*** PARENT_PATH是/p*/if(this.exists(PARENT_PATH, needWatch) != null){this.deleteNode(PARENT_PATH);} }/*** 收到來(lái)自Server的Watcher通知后的處理。* * process方法我們可以簡(jiǎn)單的讀一下* 他傳遞了一個(gè)事件類型event* 就是節(jié)點(diǎn)發(fā)生變更以后* zookeeper發(fā)生改變了之后* 到底是什么樣的事件* 是通過(guò)event對(duì)象去判斷的* 然后做相應(yīng)的操作*/@Overridepublic void process(WatchedEvent event) {/*** 傳進(jìn)來(lái)以后直接打印event對(duì)象*/System.out.println("進(jìn)入 process 。。。。。event = " + event);try {/*** 然后休眠了200毫秒*/Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}/*** 如果event沒(méi)有獲取到* 干脆就直接return吧*/if (event == null) {return;}// 連接狀態(tài)/*** 首先兩個(gè)狀態(tài)* 第一個(gè)是連接的狀態(tài)* 無(wú)非就是連接成功了還是失敗了還是過(guò)期了* 還是認(rèn)證失敗了* 就是這4個(gè)狀態(tài)* */KeeperState keeperState = event.getState();// 事件類型/*** 然后這個(gè)是最關(guān)鍵的* 就是事件的狀態(tài)* 你的zookeeper事件發(fā)生變化了* 我會(huì)給你返回不同的狀態(tài)的* */EventType eventType = event.getType();// 受影響的path/*** 通過(guò)event可以獲取受影響的路徑是什么* 就是getPath這個(gè)方法* 比如你監(jiān)控的是/p這個(gè)節(jié)點(diǎn)* 然后我對(duì)/p節(jié)點(diǎn)進(jìn)行一個(gè)set操作* 數(shù)據(jù)修改了* 原先的數(shù)據(jù)是1111現(xiàn)在改成2222了* * 受影響的節(jié)點(diǎn)是誰(shuí)啊* 當(dāng)然是p節(jié)點(diǎn)了* 大體上就是這么一個(gè)操作* * */String path = event.getPath();//原子對(duì)象seq 記錄進(jìn)入process的次數(shù)/*** 接下來(lái)有一個(gè)logPrefix* 你現(xiàn)在是進(jìn)入了Watcher線程* 其實(shí)咱們的程序一共有兩個(gè)線程* 第一個(gè)線程是咱們的主線程Main* main方法* 主線程是一直往下走* 第二個(gè)線程就是咱們的process線程* 就是我new出來(lái)的watcher* 其實(shí)就是ZooKeeperWatcher* process它是一直有的* 然后你發(fā)現(xiàn)前綴是【W(wǎng)atcher-這個(gè)名字了* 那就表明是process這個(gè)線程在做一些事* 就是這個(gè)事件被我接收到了* 肯定是會(huì)打印這個(gè)前綴* 然后this.seq就相當(dāng)于一個(gè)序列了* 我到底觸發(fā)了幾次事件* watch到底是watch了幾次* 我可能通過(guò)AtomicInteger進(jìn)行一個(gè)計(jì)數(shù)* 每次計(jì)數(shù)都會(huì)進(jìn)行加1* 最開(kāi)始的時(shí)候沒(méi)有進(jìn)行初始化* 第一次seq就進(jìn)行加1了* 第二次就變成2了* 大體上就是這個(gè)意思* * */String logPrefix = "【W(wǎng)atcher-" + this.seq.incrementAndGet() + "】";/*** 這里面也打印了三個(gè)* 把之前的三個(gè)東西直接print了一下* 首先打印一句白話收到了* 收到了watcher通知了*/System.out.println(logPrefix + "收到Watcher通知");/*** 然后把連接狀態(tài)state打印了一下*/System.out.println(logPrefix + "連接狀態(tài):\t" + keeperState.toString());/*** eventType直接toString了一下* 有一個(gè)很長(zhǎng)的字符串* 你當(dāng)前觸發(fā)變更的事件到底是什么*/System.out.println(logPrefix + "事件類型:\t" + eventType.toString());/*** 通過(guò)你自己寫(xiě)代碼去判斷了* 其實(shí)最外層是這樣* 最外層其實(shí)很簡(jiǎn)單* 監(jiān)控watcher和zookeeper的一個(gè)狀態(tài)* 無(wú)非就是這4個(gè)* 要么連接上* 連接上去這里面做一些其他的事情* 要么就是連接不上* 要么就是認(rèn)證失敗* 要么就是過(guò)期* 這么一看就簡(jiǎn)單了* 到了連接上的時(shí)候* 我這個(gè)數(shù)據(jù)是delete還是update* 數(shù)據(jù)是通過(guò)eventType去做判斷* * */if (KeeperState.SyncConnected == keeperState) {// 成功連接上ZK服務(wù)器/*** eventType一共有幾種* 我們可以看一下* 第一次連接上肯定會(huì)觸發(fā)這個(gè)事件* 然后打印一句話* 表示連接上zookeeper* 剩下的其他的狀態(tài)是咱們要了解的* * 連接成功的時(shí)候至少要走一個(gè)process* 要進(jìn)入一次* */if (EventType.None == eventType) {/*** 當(dāng)前我的client端監(jiān)控了這個(gè)/p節(jié)點(diǎn)* 應(yīng)該是觸發(fā)了什么事件呢* 很明顯觸發(fā)的是Node change的事件*/System.out.println(logPrefix + "成功連接上ZK服務(wù)器");/*** 讓阻塞的繼續(xù)去走*/connectedSemaphore.countDown();} //創(chuàng)建節(jié)點(diǎn)/*** 第一個(gè)是create* */else if (EventType.NodeCreated == eventType) {/*** 打印了一句話* 就是創(chuàng)建了一個(gè)節(jié)點(diǎn)*/System.out.println(logPrefix + "節(jié)點(diǎn)創(chuàng)建");try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}} //更新節(jié)點(diǎn)else if (EventType.NodeDataChanged == eventType) {/*** 節(jié)點(diǎn)更新的時(shí)候啥也沒(méi)做* 只是打印了一句話* 節(jié)點(diǎn)數(shù)據(jù)發(fā)生變更了*/System.out.println(logPrefix + "節(jié)點(diǎn)數(shù)據(jù)更新");try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}} //更新子節(jié)點(diǎn)/*** 更新子節(jié)點(diǎn)* 如果子節(jié)點(diǎn)發(fā)生變更的話* 我們也打印一句話* 子節(jié)點(diǎn)發(fā)生變更*/else if (EventType.NodeChildrenChanged == eventType) {System.out.println(logPrefix + "子節(jié)點(diǎn)變更");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}} //刪除節(jié)點(diǎn)else if (EventType.NodeDeleted == eventType) {/*** 刪除子節(jié)點(diǎn)的時(shí)候我繼續(xù)打印一句話* 子節(jié)點(diǎn)被刪除* 今天的代碼寫(xiě)的比較簡(jiǎn)潔* 大體上整體的process要做的事情* */System.out.println(logPrefix + "節(jié)點(diǎn) " + path + " 被刪除");}else ;} else if (KeeperState.Disconnected == keeperState) {System.out.println(logPrefix + "與ZK服務(wù)器斷開(kāi)連接");} else if (KeeperState.AuthFailed == keeperState) {System.out.println(logPrefix + "權(quán)限檢查失敗");} else if (KeeperState.Expired == keeperState) {System.out.println(logPrefix + "會(huì)話失效");}else ;System.out.println("--------------------------------------------");}/*** <B>方法名稱:</B>測(cè)試zookeeper監(jiān)控<BR>* <B>概要說(shuō)明:</B>主要測(cè)試watch功能<BR>* * 咱們一點(diǎn)一點(diǎn)看不要著急* * 它會(huì)收到兩次watch* 建立連接會(huì)打印一下* 節(jié)點(diǎn)創(chuàng)建成功主函數(shù)會(huì)打印一下* content就是咱們建立/p的value* 時(shí)間戳我們?cè)谶@里是這么去寫(xiě)的* System.currentTimeMillis()* 你會(huì)發(fā)現(xiàn)現(xiàn)在我們觸發(fā)了兩次process* 第一次肯定會(huì)走連接成功* 進(jìn)來(lái)之后肯定會(huì)第一次觸發(fā)* 然后第二次createPath的時(shí)候* 第一個(gè)節(jié)點(diǎn)的時(shí)候* 里面設(shè)置的是watch等于true* 你既然要watch這個(gè)節(jié)點(diǎn)* 因?yàn)樵蹅兊膠ookeeper去監(jiān)控節(jié)點(diǎn)* 都是一次性的* process他只執(zhí)行一次* 你如果不設(shè)置watch等于true的話* 可能目前沒(méi)有這個(gè)節(jié)點(diǎn)* 你想一想我當(dāng)前做的是什么事* * * * @param args* @throws Exception*/public static void main(String[] args) throws Exception {//建立watcher //當(dāng)前客戶端可以稱為一個(gè)watcher 觀察者角色/*** 創(chuàng)建zookeeper* */ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();//創(chuàng)建連接 zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);//System.out.println(zkWatch.zk.toString());/*** 我在這里休眠了1秒鐘* 有一個(gè)很清晰的效果* */Thread.sleep(1000);// 清理節(jié)點(diǎn)/*** 如果之前有節(jié)點(diǎn)的話* 把它都清掉* 先把這個(gè)代碼注釋掉* 先不用這個(gè)代碼了* */zkWatch.deleteAllTestPath(false);//-----------------第一步: 創(chuàng)建父節(jié)點(diǎn) /p ------------------------///*** 就是一句話* 首先看一下這句胡是什么意思* zookeeper就是默認(rèn)一個(gè)節(jié)點(diǎn)* 是非常干凈的* createPath(String path, String data, boolean needWatch)* 這個(gè)是我自己封裝的* needWatch是否需要watch* 我這里給的true* * 創(chuàng)建一個(gè)parent節(jié)點(diǎn)* 我這里設(shè)置的是true* * 我現(xiàn)在寫(xiě)false了* 我rmr遞歸的去刪除* rmr /p* 然后ls /* 又清空了一次* 然后咱們暫且寫(xiě)true吧* 創(chuàng)建節(jié)點(diǎn)之前* 這是我在創(chuàng)建節(jié)點(diǎn)之前watch這個(gè)parent* 所以說(shuō)我創(chuàng)建節(jié)點(diǎn)的時(shí)候* 才會(huì)有第二次的watch事件* 你要理解這個(gè)事情* * 現(xiàn)在我watch結(jié)束了以后* * */if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "", true)) {Thread.sleep(1000);//-----------------第二步: 讀取節(jié)點(diǎn) /p 和 讀取/p節(jié)點(diǎn)下的子節(jié)點(diǎn)(getChildren)的區(qū)別 --------------//// 讀取數(shù)據(jù)/*** 為什么在這段代碼中寫(xiě)了這兩句話呢* 其實(shí)這句話我要不寫(xiě)的話* 會(huì)是一個(gè)什么情況呢* 為什么這里要寫(xiě)個(gè)true* 如果我把這段也注釋掉* * 我在這里讀一下* 讀一下它會(huì)有一個(gè)什么效果呢* * 主動(dòng)地watch了一下這個(gè)節(jié)點(diǎn)* 我這回再更新* */
// zkWatch.readData(PARENT_PATH, true);/*** 它會(huì)再次的去更新*/zkWatch.exists(PARENT_PATH, true);// 讀取子節(jié)點(diǎn)(監(jiān)控childNodeChange事件)/*** getChildren的時(shí)候我又進(jìn)行了一次watch* 這個(gè)getChildren又是什么概念呢* * 我要監(jiān)控子節(jié)點(diǎn)的變化* * 假設(shè)我現(xiàn)在寫(xiě)成false了* * 現(xiàn)在又加上這句話* 這哥們又回來(lái)了* 改成true的話* 咱們?cè)谧咭幌? 都是true* 四個(gè)true* 這回就變成5個(gè)了* 為什么是5個(gè)呢* 第一個(gè)節(jié)點(diǎn)創(chuàng)建的時(shí)候不說(shuō)了* 就是建立連接的時(shí)候* 第二個(gè)是createParent的時(shí)候* 第三次是父節(jié)點(diǎn)發(fā)生變更dataChange* 就是parent發(fā)生變更* 第四次是/p/c創(chuàng)建的時(shí)候* 設(shè)置為true就是可以監(jiān)控了* 第五次就是getChildren設(shè)置為true了* 他就會(huì)默認(rèn)的去監(jiān)控* 就是NodeChildrenChange事件觸發(fā)的時(shí)候* 就是parent下面如果有孩子加進(jìn)來(lái)* 那我也要監(jiān)聽(tīng)一下* 那你怎么去做這個(gè)操作呢* 無(wú)非就是要加這個(gè)true* 如果我要是改成false* 就是和沒(méi)寫(xiě)是一樣的* 現(xiàn)在在咱們把這個(gè)測(cè)一下* datachange nodechange* 就是getChildren那一塊* 就是parent節(jié)點(diǎn)發(fā)生變化* 就是狀態(tài)變了* 添加了一個(gè)孩子* 這回能理解這個(gè)意思了吧* 就是你剛才看到我的操作你能理解就行了* 它是始終監(jiān)控父節(jié)點(diǎn)* 但是它是兩個(gè)方法* 父節(jié)點(diǎn)發(fā)生變更* 它會(huì)額外有一個(gè)方法* 它會(huì)永遠(yuǎn)監(jiān)控parent這個(gè)人* 我有一個(gè)額外的事件* 子節(jié)點(diǎn)產(chǎn)生變化了* 他觸發(fā)的還是parent的這個(gè)事件* 這個(gè)事件叫個(gè)名* 什么名呢* Child Node Change* 叫做子節(jié)點(diǎn)變更* 其實(shí)事件變更的還是parent* 跟節(jié)點(diǎn)沒(méi)關(guān)系* 因?yàn)槭录挠|發(fā)者還是parent* 只不過(guò)他額外加了一個(gè)* 你這兩個(gè)節(jié)點(diǎn)沒(méi)有任何關(guān)系的* 但是我昨天說(shuō)難在哪啊* 你這個(gè)子節(jié)點(diǎn)無(wú)論是create還是update還是delete* 父節(jié)點(diǎn)永遠(yuǎn)都是child node change* 就是你沒(méi)法判斷子節(jié)點(diǎn)是新增,刪除還是修改了* 這個(gè)是很惡心的事情* 如果你在工作中你真的想去實(shí)現(xiàn)這個(gè)的話* 你這會(huì)你得下會(huì)功夫* 這個(gè)東西越講越復(fù)雜* 你自己去看* * * */
// zkWatch.getChildren(PARENT_PATH, true);/*** 這里我們變成child* */
// zkWatch.getChildren(CHILDREN_PATH, true);// 更新數(shù)據(jù)/*** 咱們看這個(gè)* 先不考慮子節(jié)點(diǎn)這塊* 直接更新這個(gè)節(jié)點(diǎn)* 看行不行* 看會(huì)不會(huì)觸發(fā)update操作* nodeChange這個(gè)操作* 當(dāng)前我的zookeeper下還是清空的* 一共就觸發(fā)了兩次watch* 10秒鐘以后就結(jié)束了* 第一次是咱們連接的時(shí)候* 第二次是咱們create的時(shí)候* 我去創(chuàng)建節(jié)點(diǎn)的時(shí)候over了* 但是沒(méi)第三次了* 節(jié)點(diǎn)更新了你也沒(méi)有觸發(fā)watch事件* 這是因?yàn)樵蹅儧](méi)有去進(jìn)行watch這個(gè)操作* 并沒(méi)有去進(jìn)行watch這個(gè)操作* 數(shù)據(jù)已經(jīng)更新了* 而且更新成功了* 返回一個(gè)stat* 因?yàn)槟銉梢呀?jīng)發(fā)生一次操作了* 如果非得做這個(gè)事* * */zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");Thread.sleep(1000);// 創(chuàng)建子節(jié)點(diǎn)/*** 創(chuàng)建一個(gè)子節(jié)點(diǎn)* CHILDREN_PATH* 就是在/p下創(chuàng)建一個(gè)c* 寫(xiě)成true或者false是有影響的* 寫(xiě)false會(huì)有一個(gè)什么效果呢* * 當(dāng)我create的時(shí)候事件一點(diǎn)也沒(méi)有觸發(fā)* 原因是你沒(méi)有去監(jiān)控子節(jié)點(diǎn)* 所以還是正常的* 第一次建立連接的時(shí)候* 第二次父節(jié)點(diǎn)parent create的時(shí)候* 第三次就是父節(jié)點(diǎn)發(fā)生變更的時(shí)候* NodeDataChange了一下* 我最后只是單純的去創(chuàng)建了一個(gè)子節(jié)點(diǎn)* 沒(méi)有觸發(fā)任何的watch* 原因是沒(méi)人去監(jiān)控這個(gè)節(jié)點(diǎn)的產(chǎn)生* 所以會(huì)變成這個(gè)事* 然后咱們的zookeeper下還有一個(gè)這個(gè)* rmr /p* 刪掉* 這回多了一個(gè)nodecreated* 這里成true了* * * */zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "", true);// zkWatch.getChildren(CHILDREN_PATH, true);//-----------------第三步: 建立子節(jié)點(diǎn)的觸發(fā) --------------///*** 相當(dāng)于遞歸的去創(chuàng)建* * 現(xiàn)在有加兩個(gè)事件* 現(xiàn)在有6次了* 就是有2次create* 因?yàn)槲疫@里創(chuàng)建c1成功c2成功的時(shí)候* 多加了兩次* NodeCreate是誰(shuí)啊* 是這個(gè)/p/c/c1* 是這個(gè)/p/c/c1/c2* 如果你想在/p/c這個(gè)節(jié)點(diǎn)下監(jiān)控* 監(jiān)控他的子節(jié)點(diǎn)發(fā)生變化的時(shí)候* 你是不是還得getChildren* 怎么去寫(xiě)啊* 寫(xiě)起來(lái)就有點(diǎn)麻煩了* 這塊我之前設(shè)置成false* * */zkWatch.createPath(CHILDREN_PATH + "/c1", System.currentTimeMillis() + "", true);zkWatch.createPath(CHILDREN_PATH + "/c1/c2", System.currentTimeMillis() + "", true);//-----------------第四步: 更新子節(jié)點(diǎn)數(shù)據(jù)的觸發(fā) --------------////在進(jìn)行修改之前,我們需要watch一下這個(gè)節(jié)點(diǎn):
// Thread.sleep(1000);/*** 這里是對(duì)children path的一些修改*/
// zkWatch.readData(CHILDREN_PATH, true);
// zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");}// Thread.sleep(10000);// 清理節(jié)點(diǎn)/*** 清空節(jié)點(diǎn)咱們先注釋掉* 咱們要手動(dòng)的去清空*/
// zkWatch.deleteAllTestPath(false);/*** 等了10秒鐘以后我就釋放連接了*/Thread.sleep(5000);/*** 釋放連接咱們先保留* */zkWatch.releaseConnection();}}
?
總結(jié)
以上是生活随笔為你收集整理的Zookeeper_watch机制核心讲解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Zookeeper_原生API操作(二)
- 下一篇: Zookeeper_安全认证讲解