日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

大数据之Zookeeper

發布時間:2023/12/14 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据之Zookeeper 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 1. Zookeeper 入門
    • 1.1 概述
    • 1.2 特點
    • 1.3 數據結構
    • 1.4 應用場景
  • 2. Zookeeper 安裝
    • 2.1 下載地址
    • 2.2 本地模式安裝部署
    • 2.3 分布式安裝部署
    • 2.4 配置參數解讀
  • 3. Zookeeper 內部原理
    • 3.1 選舉機制
    • 3.2 節點類型
    • 3.3 Stat 結構體
    • 3.4 監聽器原理
    • 3.5 寫數據流程
  • 4. Zookeeper 實戰
    • 4.1 客戶端命令行操作
    • 4.2 API 操作
      • 4.3.1 IDEA 環境搭建
      • 4.3.2 創建 ZooKeeper 客戶端
      • 4.3.4 獲取子節點并監聽節點變化
      • 4.3.5 判斷 Znode 是否存在
    • 4.3 監聽服務器節點動態上下線案例
  • 5 zookeeper框架
    • 5.1 org.apache.zookeeper
    • 5.2 zkclient
      • 5.2.1 簡介
      • 5.2.2 Maven依賴
      • 5.2.3 ZkClient 的設計
      • 5.2.4 重要處理流程說明
      • 5.2.5 客戶端處理變更(Watcher通知)
      • 5.2.6 序列化處理
      • 5.2.7 ZkClient如何解決使用ZooKeeper客戶端遇到的問題的呢?
      • 5.2.8 API介紹
      • 5.2.9 demo
    • 5.3 Curator
      • 5.3.1 簡介
      • 5.3.2 版本問題
      • 5.3.3 CuratorFramework
      • 5.3.4 curator-recipes
      • 5.3.5 知識點
      • 5.3.6 Maven依賴
      • 5.3.7 api
      • 5.3.8 使用Curator高級API特性之Cache緩存監控節點變化
  • 5.4 使用Curator創建/驗證ACL(訪問權限列表)
      • 5.4.1 連通Zk時,就指定登錄權限
      • 5.4.2寫一個把明文的賬號密碼轉換為加密后的密文的工具類
      • 5.4.3使用自定義工具類AclUtils,一次性給多個用戶賦Acl權限
      • 5.4.4級聯創建節點,并賦予節點操作權限
      • 5.4.5讀取節點數據
      • 5.4.6修改具有ACL權限節點的data數據
      • 5.4.7兩種方法判斷node節點是否存(優先使用第一種)
  • 7 分布式鎖
    • 7.1.重入式排它鎖InterProcessMutex
    • 7.2.不可重入排它鎖InterProcessSemaphoreMutex
    • 7.3.可重入讀寫鎖InterProcessReadWriteLock 、InterProcessLock
    • 7.4.多鎖對象容器(多共享鎖) ,將多個鎖作為單個實體管理,InterProcessMultiLock、InterProcessLock
    • 7.5.代碼
  • 8.分布式計數器

1. Zookeeper 入門

1.1 概述

Zookeeper 是一個開源的分布式的,為分布式應用提供協調服務的 Apache 項目。
??Zookeeper 從設計模式角度來理解:是一個基于觀案者模式設計的分布式服務管理框架,它負責存儲和管理大家都關心的數據,然后接受觀察者的注冊,一旦這些數據的狀態發生變化,Zookeeper 就將負責通知已經在 Zookeeper 上注冊的那些觀察者做出相應的反應。
??

1.2 特點

1)Zookeeper:一個領導者(Leader) ,多個跟隨者(Follower)組成的集群。
2)集群中只要有半數以上節點存活,Zookeeper 集群就能正常服務。
3)全局數據一致:每個 Server 保存一份相同的數據副本,Client 無論連接到哪個 Server,數據都是一致的。
4)更新請求順序進行,來自同一個 Client 的更新請求按其發送順序依次執行。
5)數據更新原子性,一次數據更新要么成功,要么失敗。
6)實時性,在一定時間范圍內,Client 能讀到最新數據。

1.3 數據結構

ZooKeeper 數據模型的結構與 Unix 文件系統很類似,整體上可以看作是一棵樹,每個節點稱做一個 ZNode。每一個 ZNode 默認能夠存儲 1 MB 的數據,每個 ZNode 都可以通過其路徑唯一標識。

1.4 應用場景

提供的服務包括:統一命名服務、統一配置管理、統一集群管理、服務器節點動態上下線、軟負載均衡等。
統一命名服務

在分布式環境下,經常需要對應用/服務進行統一命名 ,便于識別。例如:IP 不容易記住,而域名容易記住。
統一配置管理
(1)分布式環境下,配置文件同步非常常見。
??① 一般要求一個集群中,所有節點的配置信息是一致的,比如 Kafka 集群。
??② 對配置文件修改后,希望能夠快速同步到各個節點上。
(2)配置管理可交由 ZooKeeper 實現。
??① 可將配置信息寫入 ZooKeeper 上的一個 Znode 。
??② 各個客戶端服務器監聽這個 Znode。
??③ 一旦 Znode 中的數據被修改,ZooKeeper 將通知各個客戶端服務器。

統一集群管理
(1)分布式環境中,實時掌握每個節點的狀態是必要的。
??可根據節點實時狀態做出一些調整。
(2)ZooKeeper 可以實現實時監控節點狀態變化
??① 可將節點信息寫入Z ooKeeper 上的一個 ZNode。
??② 監聽這個 ZNode 可獲取它的實時狀態變化。
服務器動態上下線

軟負載均衡

在 Zookeeper 中記錄每臺服務器的訪問數,讓訪問數最少的服務器去處理最新的客戶端請求。

2. Zookeeper 安裝

2.1 下載地址

zookeeper 官網

2.2 本地模式安裝部署

準備工作

tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local/ mv apache-zookeeper-3.5.6-bin/ zookeeper mv zoo_sample.cfg zoo.cfg mkdir -p /usr/local/zookeeper/datavim zoo.cfg dataDir=/usr/local/zookeeper/data

vim /etc/profile
在配置文件中添加以下內容
#ZOOKEEPER
export ZOOKEEPER_HOME=/hadoop/zookeeper-3.5.6
export PATH=PATH:PATH:PATH:ZOOKEEPER_HOME/bin
source /etc/profile

啟動 Zookeeper
zkServer.sh start

啟動客戶端
zkCli.sh
退出客戶端
quit
停止 Zookeeper
zkServer.sh stop

2.3 分布式安裝部署

集群規劃
在 master、slave1 和 slave2 三個節點上部署 Zookeeper。
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local
mv apache-zookeeper-3.5.6-bin/ zookeeper
同步 /usr/local/zookeeper 目錄內容到 slave1、slave2

xsync zookeeper/
配置服務器編號
① 在 /usr/local/zookeeper/ 這個目錄下創建 zkData
mkdir data
② /usr/local/zookeeper/data 目錄下創建一個 myid 的文件
touch myid
③ 編輯 myid 文件
vim myid
在文件中添加與 server 對應的編號:
0
④ 分發到其他機器上
xsync myid
并分別在 slave1、slave2 上修改 myid 文件中內容為 1、2

配置 zoo.cfg 文件
① 將 /usr/local/zookeeper/conf 這個路徑下的 zoo_sample.cfg 修改為 zoo.cfg
mv zoo_sample.cfg zoo.cfg
② 打開 zoo.cfg 文件,修改 dataDir 路徑
dataDir=/usr/local/zookeeper/data

增加如下配置
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888
同步 zoo.cfg 配置文件
xsync zoo.cfg

修改環境變量
① 打開配置文件
vim /etc/profile
② 在配置文件中添加以下內容
#ZOOKEEPER
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=PATH:PATH:PATH:ZOOKEEPER_HOME/bin
③ 同步配置文件
xsync /etc/profile
④ 使配置文件生效(三臺機器)
source /etc/profile

集群操作
① 三臺機器分別啟動 Zookeeper
zkServer.sh start
② 三臺機器分別關閉 Zookeeper
zkServer.sh stop
編寫 Zookeeper 的群起群關腳本

① 在 /usr/local/bin 目錄下創建 zk 文件
vim zk.sh

#!/bin/bash case $1 in "start"){for i in master slave1 slave2doecho "****************** $i *********************"ssh $i "source /etc/profile && zkServer.sh start"done };;"stop"){for i in master slave1 slave2doecho "****************** $i *********************"ssh $i "source /etc/profile && zkServer.sh stop"done };;esac

修改腳本 zk 具有執行權限
chmod 777 zk.sh
調用腳本形式:zk start 或 zk stop

2.4 配置參數解讀

Zookeeper 中的配置文件 zoo.cfg 中參數含義解讀如下:
tickTime =2000:通信心跳數,Zookeeper 服務器與客戶端心跳時間,單位毫秒
Zookeeper 使用的基本時間,服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個tickTime 時間就會發送一個心跳,時間單位為毫秒。它用于心跳機制,并且設置最小的 session 超時時間為兩倍心跳時間。(session 的最小超時時間是 2*tickTime)

initLimit =10:LF 初始通信時限
集群中的 Follower 跟隨者服務器與 Leader 領導者服務器之間初始連接時能容忍的最多心跳數(tickTime的數量),用它來限定集群中的 Zookeeper 服務器連接到 Leader 的時限。

syncLimit =5:LF 同步通信時限
集群中 Leader 與 Follower 之間的最大響應時間單位,假如響應超過 syncLimit * tickTime,Leader 認為 Follwer 死掉,從服務器列表中刪除 Follwer。

dataDir:數據文件目錄+數據持久化路徑
主要用于保存 Zookeeper 中的數據。

clientPort =2181:客戶端連接端口
監聽客戶端連接的端口。

server.A=B:C:D
A 是一個數字,表示這個是第幾號服務器;集群模式下配置一個文件 myid,這個文件在 dataDir 目錄下,這個文件里面有一個數據就是 A 的值,Zookeeper 啟動時讀取此文件,拿到里面的數據與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個server。
B 是這個服務器的 ip 地址;
C 是這個服務器與集群中的 Leader 服務器交換信息的端口;
D 是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。

3. Zookeeper 內部原理

3.1 選舉機制

半數機制

集群中半數以上機器存活,集群可用。所以 Zookeeper 適合安裝奇數臺服務器。

Zookeeper 雖然在配置文件中并沒有指定 Master 和 Slave。但是,Zookeeper 工作時,是有一個節點為 Leader,其他則為 Follower,Leader 是通過內部的選舉機制臨時產生的。

選舉過程例子

假設有五臺服務器組成的 Zookeeper 集群,它們的 id 從1-5,同時它們都是最新啟動的,也就是沒有歷史數據,在存放數據量這一點上,都是一樣的。假設這些服務器依序啟動。

① 服務器 1 啟動,此時只有它一臺服務器啟動了,它發出去的報文沒有任何響應,所以它的選舉狀態一直是 LOOKING 狀態。

② 服務器 2 啟動,它與最開始啟動的服務器 1 進行通信,互相交換自己的選舉結果,由于兩者都沒有歷史數據,所以 id 值較大的服務器 2 勝出,但是由于沒有達到超過半數以上的服務器都同意選舉它(這個例子中的半數以上是 3),所以服務器 1、2 還是繼續保持 LOOKING 狀態。

③ 服務器 3 啟動,根據前面的理論分析,服務器 3 成為服務器 1、2、3 中的老大,而與上面不同的是,此時有三臺服務器選舉了它,所以它成為了這次選舉的 Leader。

④ 服務器 4 啟動,根據前面的分析,理論上服務器4應該是服務器 1、2、3、4 中最大的,但是由于前面已經有半數以上的服務器選舉了服務器 3,所以它只能接收當小弟的命了。

⑤ 服務器 5 啟動,同 4 一樣當小弟。

3.2 節點類型

持久(Persistent)

客戶端和服務器端斷開連接后,創建的節點不刪除

短暫(Ephemeral)

客戶端和服務器端斷開連接后,創建的節點自己刪除

節點類型

① 持久化目錄節點

客戶端與 Zookeeper 斷開連接后,該節點依舊存在。

② 持久化順序編號目錄節點

客戶端與 Zookeeper 斷開連接后,該節點依舊存在,只是 Zookeeper 給該節點名稱進行順序編號

③ 臨時目錄節點

客戶端與 Zookeeper 斷開連接后,該節點被刪除

④ 臨時順序編號目錄節點

客戶端與 Zookeeper 斷開連接后,該節點被刪除,只是 Zookeeper 給該節點名稱進行順序編號。

說明: 創建 znode 時設置順序標識,znode 名稱后會附加一個值,順序號是一個單調遞增的計數器,由父節點維護。
注意: 在分布式系統中,順序號可以被用于為所有的事件進行全局排序,這樣客戶端可以通過順序號推斷事件的順序。

3.3 Stat 結構體

czxid: 創建節點的事務 zxid

每次修改 ZooKeeper 狀態都會收到一個 zxid 形式的時間戳,也就是 ZooKeepe r事務 ID。
事務 ID 是 ZooKeeper 中所有修改總的次序。每個修改都有唯一的 zxid,若 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前發生。

ctime: znode 被創建的毫秒數(從 1970 年開始)

mzxid: znode 最后更新的事務 zxid

mtime: znode 最后修改的毫秒數(從 1970 年開始)

pZxid: znode 最后更新的子節點 zxid

cversion : znode 子節點變化號,znode 子節點修改次數

dataversion: znode 數據變化號

aclVersion: znode 訪問控制列表的變化號

ephemeralOwner: 如果是臨時節點,這個是 znode 擁有者的 session id。如果不是臨時節點則是 0。

dataLength: znode 的數據長度

numChildren: znode 子節點數量

3.4 監聽器原理

監聽原理詳解:

① 首先要有一個 main() 線程
② 在 main 線程中創建 Zokeeper 客戶端,這時就會創建兩個線程,一個負責網絡連接通信(connet),一個負責監聽(listener) 。
③ 通過 connect 線程將注冊的監聽事件發送給 Zookeeper。
④ 在 Zookeeper 的注冊監聽器列表中將注冊的監聽事件添加到列表中。
⑤ Zookeeper 監聽到有數據或路徑變化,就會將這個消息發送給 listener 線程。
⑥ listener 線程內部調用了 process() 方法。

常見的監聽
① 監聽節點數據的變化
get -w path
② 監聽子節點增減的變化
ls -w path

3.5 寫數據流程

4. Zookeeper 實戰

4.1 客戶端命令行操作

啟動客戶端
zkCli.sh
顯示所有操作命令
help
查看當前 znode 中所包含的內容
ls /
ls2 /
查看當前節點詳細數據
ls -s /

分別創建 2 個普通節點
create /animals “dog”
create /animals/small “ant”
獲得節點的值
get /animals
get /animals/small

創建短暫節點
create -e /animals/big “elephant”
創建帶序號的節點
create -s /animals/middle “hourse”

修改節點數據值
set /animals/small “bug”
節點的值變化監聽
① 在 slave1 主機上注冊監聽 /animals 節點數據變化
get -w /animals
② 在 slave2 主機上修改 /animals 節點的數據
set /animals “cat”
③ 觀察 slave1 主機收到子節點變化的監聽

節點的子節點變化監聽(路徑變化)
① 在 slave1 主機上注冊監聽 /animals 節點的子節點變化
ls -w /animals
② 在 slave2 主機 /animals 節點上創建子節點
create /animals/mini “fly”
③ 觀察 slave1 主機收到子節點變化的監聽

刪除節點
delete /animals/big
遞歸刪除節點
deleteall /animals/mini
查看節點狀態
stat /animals

4.2 API 操作

4.3.1 IDEA 環境搭建

創建一個 Maven 工程
在 pom 文件中添加依賴

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.9</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency></dependencies>

在項目的 src/main/resources 目錄下,新建一個文件,命名為 “log4j.properties”,在文件中填入:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4.3.2 創建 ZooKeeper 客戶端

@SpringBootTest
public class ZookeeperTest {

private static String connectString = "localhost:2181"; private static int sessionTimeout = 2000; private static ZooKeeper zkClient;@Test public static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {}}); }

}

4.3.3 創建子節點
先將上面的 init() 方法前面的注解 @Test 改為 @BeforeAll

// 創建子節點

@SpringBootTest public class ZookeeperTest {private static String connectString = "localhost:2181";private static int sessionTimeout = 2000;private static ZooKeeper zkClient;@BeforeAllpublic static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {}});}@Testpublic void createNode() throws Exception { // 參數1:要創建的節點的路徑; 參數2:節點數據 ; 參數3:節點權限 ;參數4:節點的類型String path = zkClient.create("/demo1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println(path);} }

4.3.4 獲取子節點并監聽節點變化

// 獲取子節點并監聽節點變化 @SpringBootTest public class WatchTest {private static String connectString = "localhost:2181";private static int sessionTimeout = 2000;private static ZooKeeper zkClient;@Testpublic void getChildrenAndWatch() throws Exception {List<String> children = zkClient.getChildren("/", true);for (String child : children) {System.out.println(child);}// 延時阻塞Thread.sleep(Long.MAX_VALUE);}@BeforeAllpublic static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {List<String> children = null;try {children = zkClient.getChildren("/", true);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}for (String child : children) {System.out.println(child);}}});} }

4.3.5 判斷 Znode 是否存在

// 判斷znode是否存在
@Test
public void exist() throws Exception {
Stat stat = zkClient.exists(“/animals”, false);
System.out.println(stat == null ? “not exist” : “exist”);
}

4.3 監聽服務器節點動態上下線案例

需求

某分布式系統中,主節點可以有多臺,可以動態上下線,任意一臺客戶端都能實時感知到主節點服務器的上下線。

需求分析

代碼實現

① 先在集群上創建 /servers 節點
create /servers “servers”
② 服務器端向 Zookeeper 注冊代碼

package zookeeper;

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {

private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient;public static void main(String[] args) throws Exception {args = new String[]{"slave1"};DistributeServer server = new DistributeServer();// 1.連接zookeeper集群server.getConnect();// 2.注冊節點server.register(args[0]);// 3.業務邏輯處理server.business(); }private void getConnect() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {}}); }private void register(String hostname) throws KeeperException, InterruptedException {String path = zkClient.create("/servers/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + " is online"); }private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE); }

}

③ 客戶端代碼

package zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient;public static void main(String[] args) throws Exception {DistributeClient client = new DistributeClient();// 1.連接zookeeper集群client.getConnect();// 2.注冊監聽client.getChildren();// 3.業務邏輯處理client.business(); }private void getConnect() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {try {getChildren();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}); }private void getChildren() throws KeeperException, InterruptedException {List<String> children = zkClient.getChildren("/servers", true);// 存儲服務器節點主機名稱集合ArrayList<String> hosts = new ArrayList<String>();for (String child : children) {byte[] data = zkClient.getData("/servers/" + child, false, null);hosts.add(new String(data));}System.out.println(hosts); }private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE); }

}

5 zookeeper框架

5.1 org.apache.zookeeper

5.2 zkclient

5.2.1 簡介

ZkClient 是由 Datameer 的工程師開發的開源客戶端,對 Zookeeper 的原生 API 進行了包裝,實現了超時重連、Watcher 反復注冊等功能。
在使用 ZooKeeper 的 Java 客戶端時,經常需要處理幾個問題:重復注冊 watcher、session失效重連、異常處理。
IZKConnection:是一個ZkClient與Zookeeper之間的一個適配器;在代碼里直接使用的是ZKClient,實質上還是委托了zookeeper來處理了。

在ZKClient中,根據事件類型,分為

節點事件(數據事件),對應的事件處理器是IZKDataListener;
子節點事件,對應的事件處理器是IZKChildListener;
Session事件,對應的事件處理器是IZKStatusListener;
ZkEventThread:是專門用來處理事件的線程

目前已經運用到了很多項目中,知名的有 Dubbo、Kafka、Helix。

5.2.2 Maven依賴

<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version> </dependency>

5.2.3 ZkClient 的設計

從上述結構上看,IZKConnection 是一個 ZkClient 與 ZooKeeper 之間的一個適配器。在代碼里直接使用的是 ZKClient,其實質還是委托了 zookeeper 來處理了。

使用 ZooKeeper 客戶端來注冊 watcher 有幾種方法: 1、創建 ZooKeeper 對象時指定默認的 Watcher,2、getData(),3、exists(),4、 getchildren。其中 getdata,exists 注冊的是某個節點的事件處理器(watcher),getchildren 注冊的是子節點的事件處理器(watcher)。而在 ZKClient 中,根據事件類型,分為了節點事件(數據事件)、子節點事件。對應的事件處理器則是 IZKDataListener 和 IZKChildListener。另外加入了 Session 相關的事件和事件處理器。

ZkEventThread 是專門用來處理事件的線程。

5.2.4 重要處理流程說明

啟動 ZKClient

在創建 ZKClient 對象時,就完成了到 ZooKeeper 服務器連接的建立。具體過程是這樣的:

啟動時,指定好 connection string,連接超時時間,序列化工具等。
創建并啟動 eventThread,用于接收事件,并調度事件監聽器 Listener 的執行。
連接到 zookeeper 服務器,同時將 ZKClient 自身作為默認的 Watcher。
為節點注冊Watcher:

ZooKeeper 的三個方法:getData、getChildren、exists.

ZKClient 都提供了相應的代理方法。就拿 exists 來看:

可以看到,是否注冊 watcher,由 hasListeners(path)來決定的。

hasListeners 就是看有沒有與該數據節點綁定的 listener。

所以,默認情況下,都會自動的為指定的 path 注冊 watcher,并且是默認的 watcher (ZKClient)。怎么才能讓 hasListeners 判定值為 true 呢,也就是怎么才能為 path 綁定 Listener 呢?
ZKClient提供了訂閱功能:

一個新建的會話,只需要在取得響應的數據節點后,調用 subscribteXxx 就可以訂閱上相應的事件了。

5.2.5 客戶端處理變更(Watcher通知)

前面已經知道,ZKClient 是默認的 Watcher,并且在為各個數據節點注冊的 Watcher 都是這個默認的 Watcher。那么該是如何將各種事件通知給相應的 Listener 呢?

處理過程大致可以概括為下面的步驟:

判斷變更類型:變更類型分為 State 變更、ChildNode 變更(創建子節點、刪除子節點、修改子節點數據)、NodeData 變更(創建指定 node,刪除節點,節點數據變更)。
取出與 path 關聯的 Listeners,并為每一個 Listener 創建一個 ZKEvent,將 ZkEvent 交給 ZkEventThread 處理。
ZkEventThread 線程,拿到 ZkEvent 后,只需要調用 ZkEvent 的 run 方法進行處理。 從這里也可以知道,具體的怎么如何調用 Listener,還要依賴于 ZkEvent 的 run()實現了。
注冊監聽 watcher:

接口類注冊監聽方法解除監聽方法
IZkChildListener(子節點)ZkClient的subscribeChildChanges方法ZkClient 的unsubscribeChildChanges 方法
IZkDataListener(數據)ZkClient 的subscribeDataChanges 方法ZkClient 的 unsubscribeDataChanges 方法
IZkStateListener(客戶端狀 態)ZkClient 的 subscribeStateChanges 方 法ZkClient 的 unsubscribeStateChanges 方法

在 ZkClient 中客戶端可以通過注冊相關的事件監聽來實現對 Zookeeper 服務端時間的訂閱。

其中 ZkClient 提供的監聽事件接口有以下幾種:

其中 ZkClient 還提供了一個 unsubscribeAll 方法,來解除所有監聽。

Zookeeper 中提供的變更操作有:節點的創建、刪除,節點數據的修改:

創建操作,數據節點分為四種,ZKClient 分別為他們提供了相應的代理:

刪除節點的操作:

修改節點數據的操作:

writeDataReturnStat():寫數據并返回數據的狀態。
updateDataSerialized():修改已序列化的數據。執行過程是:先讀取數據,然后使用DataUpdater 對數據修改,最后調用 writeData 將修改后的數據發送給服務端。

5.2.6 序列化處理

ZooKeeper 中,會涉及到序列化、反序列化的操作有兩種:getData、setData。在 ZKClient 中,分別用 readData、writeData 來替代了。

對于 readData:先調用 zookeeper 的 getData,然后進行使用 ZKSerializer 進行反序列化工 作。

對于 writeData:先使用 ZKSerializer 將對象序列化后,再調用 zookeeper 的 setData。

5.2.7 ZkClient如何解決使用ZooKeeper客戶端遇到的問題的呢?

Watcher 自動重注冊:這個要是依賴于 hasListeners()的判斷,來決定是否再次注冊。如果對此有不清晰的,可以看上面的流程處理的說明。
Session 失效重連:如果發現會話過期,就先關閉已有連接,再重新建立連接。
異常處理:對比 ZooKeeper 和 ZKClient,就可以發現 ZooKeeper 的所有操作都是拋異常 的,而 ZKClient 的所有操作,都不會拋異常的。在發生異常時,它或做日志,或返回空, 或做相應的 Listener 調用。
相比于 ZooKeeper 官方客戶端,使用 ZKClient 時,只需要關注實際的 Listener 實現即可。所 以這個客戶端,還是推薦大家使用的。
https://www.cnblogs.com/jinchengll/p/12333213.html

5.2.8 API介紹

啟動ZKClient:在創建ZKClient對象時,就完成了到ZooKeeper服務器連接的建立
1、啟動時,制定好connection string,連接超時時間,序列化工具等
2、創建并啟動eventThread,用于接收事件,并調度事件監聽器Listener的執行
3、連接到Zookeeper服務器,同時將ZKClient自身作為默認的Watcher

為節點注冊Watcher
Zookeeper 原始API的三個方法:getData,getChildren、exists,ZKClient都提供了相應的代理方法,比如exists,

hasListeners是看有沒有與該數據節點綁定的listener


所以,默認情況下,都會自動的為指定的path注冊watcher,并且是默認的watcher(ZKClient),那么怎樣才能讓hasListeners值為true呢,也就是怎么才能為path綁定Listener呢?
ZKClient提供了訂閱功能,一個新建的會話,只需要在取得響應的數據節點后,調用subscribeXXX就可以訂閱上相應的事件了。

5.2.9 demo

  • createParents可以遞歸創建節點(public void createPersistent(String path, boolean createParents))
  • 無需注冊watcher(前面也說了,ZKClient幫我們做好了)
  • 節點內容可以傳任意類型數據
  • 可以自定義內容的序列化和反序列化
  • 在沒指定zkSerializer時,默認使用java自動的序列化和反序列化
  • public class ZkClientCrud<T> {ZkClient zkClient ;final static Logger logger = LoggerFactory.getLogger(ZkClientCrud.class);public ZkClientCrud(ZkSerializer zkSerializer) {logger.info("鏈接zk開始");// zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout);zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout,ZookeeperUtil.sessionTimeout,zkSerializer);}public void createEphemeral(String path,Object data){zkClient.createEphemeral(path,data);}/**** 支持創建遞歸方式* @param path* @param createParents*/public void createPersistent(String path,boolean createParents){zkClient.createPersistent(path,createParents);}/**** 創建節點 跟上data數據* @param path* @param data*/public void createPersistent(String path,Object data){zkClient.createPersistent(path,data);}/**** 子節點* @param path* @return*/public List<String> getChildren(String path){return zkClient.getChildren(path);}public T readData(String path){return zkClient.readData(path);}public void writeData(String path,Object data){zkClient.writeData(path,data);}//遞歸刪除public void deleteRecursive(String path){zkClient.deleteRecursive(path);} } public class ZkClientCrudTest {final static Logger logger = LoggerFactory.getLogger(ZkClientCrudTest.class);public static void main(String[] args) {ZkClientCrud<User> zkClientCrud=new ZkClientCrud<User>(new SerializableSerializer());String path="/root";zkClientCrud.deleteRecursive(path);zkClientCrud.createPersistent(path,"hi");/* zkClientCrud.createPersistent(path+"/a/b/c",true);//遞歸創建 但是不能設在value//zkClientCrud.createPersistent(path,"hi");logger.info(zkClientCrud.readData(path));//更新zkClientCrud.writeData(path,"hello");logger.info(zkClientCrud.readData(path));logger.info(String.valueOf(zkClientCrud.getChildren(path)));//子節點List<String> list=zkClientCrud.getChildren(path);for(String child:list){logger.info("子節點:"+child);}*/User user=new User();user.setId(1);user.setName("張三");zkClientCrud.writeData(path,user);System.out.println(zkClientCrud.readData(path).getName());;} } @Data @NoArgsConstructor @AllArgsConstructor public class User implements Serializable {private Integer id;private String name; }

    Watcher

    public class ZkClientWatcher {ZkClient zkClient;public ZkClientWatcher() {zkClient = new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);}public void createPersistent(String path, Object data) {zkClient.createPersistent(path, data);}public void writeData(String path, Object object) {zkClient.writeData(path, object);}public void delete(String path) {zkClient.delete(path);}public boolean exists(String path) {return zkClient.exists(path);}public void deleteRecursive(String path) {zkClient.deleteRecursive(path);}//對父節點添加監聽數據變化。public void subscribe(String path) {zkClient.subscribeDataChanges(path, new IZkDataListener() {@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {System.out.printf("變更的節點為:%s,數據:%s\r\n", dataPath, data);}@Overridepublic void handleDataDeleted(String dataPath) throws Exception {System.out.printf("刪除的節點為:%s\r\n", dataPath);}});}//對父節點添加監聽子節點變化。public void subscribe2(String path) {zkClient.subscribeChildChanges(path, new IZkChildListener() {@Overridepublic void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {System.out.println("父節點: " + parentPath + ",子節點:" + currentChilds + "\r\n");}});}//客戶端狀態public void subscribe3(String path) {zkClient.subscribeStateChanges(new IZkStateListener() {@Overridepublic void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {if (state == Watcher.Event.KeeperState.SyncConnected) {//當我重新啟動后start,監聽觸發System.out.println("連接成功");} else if (state == Watcher.Event.KeeperState.Disconnected) {System.out.println("連接斷開");//當我在服務端將zk服務stop時,監聽觸發} elseSystem.out.println("其他狀態" + state);}@Overridepublic void handleNewSession() throws Exception {System.out.println("重建session");}@Overridepublic void handleSessionEstablishmentError(Throwable error) throws Exception {}});}/* @Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {}@Overridepublic void handleDataDeleted(String dataPath) throws Exception {}*/ } public class ZkClientWatcherTest {public static void main(String[] args) throws InterruptedException {ZkClientWatcher zkClientWatche=new ZkClientWatcher();String path="/root";zkClientWatche.deleteRecursive(path);zkClientWatche.createPersistent(path,"hello");zkClientWatche.subscribe(path);zkClientWatche.subscribe2(path);// zkClientWatche.subscribe3(path);//需要啟服務// Thread.sleep(Integer.MAX_VALUE);zkClientWatche.createPersistent(path+"/root2","word");TimeUnit.SECONDS.sleep(1);zkClientWatche.writeData(path,"hi");TimeUnit.SECONDS.sleep(1);//zkClientWatche.delete(path);//如果目錄下有內容 不能刪除 會報 Directory not empty for /root的異常zkClientWatche.deleteRecursive(path);TimeUnit.SECONDS.sleep(1); //這個main線程就結束} } public class ZookeeperUtil {/** zookeeper服務器地址 */ // public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";public static final String connectString = "localhost:2181";/** 定義session失效時間 */public static final int sessionTimeout = 5000;public static final String path = "/root"; }

    5.3 Curator

    5.3.1 簡介

    zookeeper不是為高可用性設計的,但它使用ZAB協議達到了極高的一致性。所以它經常被選作注冊中心、配置中心、分布式鎖等場景。
    它的性能是非常有限的,而且API并不是那么好用。xjjdog傾向于使用基于Raft協議的Etcd或者Consul,它們更加輕量級一些。
    Curator是netflix公司開源的一套zookeeper客戶端,目前是Apache的頂級項目。與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。Curator解決了很多zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊wathcer和NodeExistsException 異常等。

    Zookeeper 原生API問題:
    1.超時重連,不支持自動,需要手動操作
    2.Watch注冊一次后會失效
    3.不支持遞歸創建節點

    Zookeeper API 升級版 Curator:
    1.解決watcher的注冊一次就失效
    2.提供更多解決方案并且實現簡單
    3.提供常用的ZooKeeper工具類
    4.編程風格更爽,點點點就可以了
    5.可以遞歸創建節點等

    Curator由一系列的模塊構成,對于一般開發者而言,常用的是curator-framework和curator-recipes。

    5.3.2 版本問題

    Curator2.x.x版本兼容Zookeeper的3.4.x和3.5.x。
    Curator3.x.x只兼容Zookeeper 3.5.x,并且提供了一些諸如動態重新配置、watch刪除等新特性。
    Curator4 統一對 ZooKeeper 3.4.x 和 3.5.x 的支持

    5.3.3 CuratorFramework

    Curator-Framework是ZooKeeper Client更高的抽象API,最佳核心的功能就是自動連接管理:
    當ZooKeeper客戶端內部出現異常, 將自動進行重連或重試, 該過程對外幾乎完全透明
    監控節點數據變化事件NodeDataChanged,需要時調用updateServerList()方法
    Curator recipes自動移除監控

    更加清晰的API
    簡化了ZooKeeper原生的方法, 事件等, 提供流式fluent的接口,提供Recipes實現 : 選舉,共享鎖, 路徑cache, 分布式隊列,分布式優先隊列等。

    5.3.4 curator-recipes

    curator-recipes:封裝了一些高級特性,如:Cache事件監聽、 Elections選舉、分布式鎖、分布式計數器、分布式Barrier、Queues隊列等

    5.3.5 知識點

    1.使用curator建立與zk的連接
    2.使用curator添加/遞歸添加節點
    3.使用curator刪除/遞歸刪除節點
    4.使用curator創建/驗證 ACL(訪問權限列表)
    5.使用curator監聽 單個/父 節點的變化(watch事件)
    6.基于curator實現Zookeeper分布式鎖(需要掌握基本的多線程知識)
    7.基于curator實現分布式計數器

    5.3.6 Maven依賴

    <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><!--建議和本地安裝版本保持一致--><version>3.7.0</version> </dependency> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.2.0</version> </dependency> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version> </dependency>

    5.3.7 api

    import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class ZkConnectCuratorUtil {final static Logger log = LoggerFactory.getLogger(ZkConnectCuratorUtil.class);public CuratorFramework zkClient = null; //zk的客戶端工具Curator(在本類通過new實例化的是,自動start)private static final int MAX_RETRY_TIMES = 3; //定義失敗重試次數private static final int BASE_SLEEP_TIME_MS = 5000; //連接失敗后,再次重試的間隔時間 單位:毫秒private static final int SESSION_TIME_OUT = 1000000; //會話存活時間,根據業務靈活指定 單位:毫秒private static final String ZK_SERVER_IP_PORT = "localhost:2181";//Zookeeper服務所在的IP和客戶端端口private static final String NAMESPACE = "workspace";//指定后,默認操作的所有的節點都會在該工作空間下進行//本類通過new ZkCuratorUtil()時,自動連通zkClientpublic ZkConnectCuratorUtil() {RetryPolicy retryPolicy = new RetryNTimes(MAX_RETRY_TIMES, BASE_SLEEP_TIME_MS);//首次連接失敗后,重試策略zkClient = CuratorFrameworkFactory.builder()//.authorization("digest", "root:root".getBytes())//登錄超級管理(需單獨配).connectString(ZK_SERVER_IP_PORT).sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy).namespace(NAMESPACE).build();zkClient.start();}public void closeZKClient() {if (zkClient != null) {this.zkClient.close();}}public static void main(String[] args) {ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil();boolean ifStarted=zkUtil.zkClient.isStarted();System.out.println("當前客戶的狀態:" + (ifStarted ? "連接中" : "已關閉"));zkUtil.closeZKClient();boolean ifClose = zkUtil.zkClient.isStarted();System.out.println("當前客戶的狀態:" + (ifClose ? "連接成功" : "已關閉"));} } public class CuratorDao {//使用curator(遞歸)添加節點//級聯創建節點(原生API不支持/后臺客戶端也不支持,但是Curator支持)public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception {zkClient.create().creatingParentContainersIfNeeded()//創建父節點,如果需要的話.withMode(CreateMode.PERSISTENT) //指定節點是臨時的,還是永久的.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定節點的操作權限.forPath(nodePath, nodeData.getBytes());System.out.println(nodePath + "節點已成功創建…");}//使用curator(遞歸)刪除節點//刪除node節點及其子節點public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception {zkClient.delete().guaranteed() //保證刪除:如果刪除失敗,那么在后端還是繼續會刪除,直到成功.deletingChildrenIfNeeded() //級聯刪除子節點//.withVersion(1)//版本號可以據需使用.forPath(nodePath);System.out.println(nodePath + "節點已刪除成功…");}//使用curator更新節點數據//更新節點data數據public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception {zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本號據需使用,默認可以不帶System.out.println(nodePath + "節點數據已修改成功…");}//使用curator查詢節點數據//查詢node節點數據public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = new Stat();byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath);System.out.println("節點" + nodePath + "的數據為" + new String(data));System.out.println("節點的版本號為:" + stat.getVersion());}//使用curator查詢節點的子節點//打印node子節點public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception {List<String> childNodes = zkClient.getChildren().forPath(parentNodePath);System.out.println("開始打印子節點");for (String str : childNodes) {System.out.println(str);}}//使用curator判斷節點是否存在//判斷node節點是否存在public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = zkClient.checkExists().forPath(nodePath);System.out.println(null == stat ? "節點不存在" : "節點存在");}/**************使用Curator高級API特性之Cache緩存監控節點變化*************/@Testpublic void test() throws Exception {ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();CuratorFramework zkClient = zkUtil.zkClient; // CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui"); // CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test"); // CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi"); // CuratorDao.getNodeData(zkClient,"/xiaosi/test"); // CuratorDao.printChildNodes(zkClient, "/xiaosi");CuratorDao.checkNodeExists(zkClient, "/xiaosi");}}

    5.3.8 使用Curator高級API特性之Cache緩存監控節點變化

    cache是一種緩存機制,可以借助cache實現監聽。
    簡單來說,cache在客戶端緩存了znode的各種狀態,當感知到zk集群的znode狀態變化,會觸發event事件,注冊的監聽器會處理這些事件。
    curator支持的cache種類有4種Path Cache,Node Cache,Tree Cache,Curator Cache
    1)Path Cache
    Path Cache用來觀察ZNode的子節點并緩存狀態,如果ZNode的子節點被創建,更新或者刪除,那么Path Cache會更新緩存,并且觸發事件給注冊的監聽器。
    它是通過PathChildrenCache類來實現的,監聽器注冊是通過PathChildrenCacheListener。
    2)Node Cache
    Node Cache用來觀察ZNode自身,如果ZNode節點本身被創建,更新或者刪除,那么Node Cache會更新緩存,并觸發事件給注冊的監聽器。
    它是通過NodeCache類來實現的,監聽器對應的接口為NodeCacheListener。
    3)Tree Cache
    Tree Cache是上兩種的合體,Tree Cache觀察的是自身+所有子節點的所有數據,并緩存所有節點數據。
    它是通過TreeCache類來實現的,監聽器對應的接口為TreeCacheListener。
    4)Curator Cache ( requires ZooKeeper 3.6+)
    Curator Cache,是在zk3.6新版本添加的特性,該版本的出現是為了逐步淘汰上面3監聽。
    它是通過CuratorCache類來實現的,監聽器對應的接口為CuratorCacheListener。

    Curator一次性的watch

    import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.WatchedEvent;public class MyCuratorWatcher implements CuratorWatcher {@Overridepublic void process(WatchedEvent event) throws Exception {System.out.println("觸發watcher,節點路徑為:" + event.getPath());switch (event.getType()) {case NodeCreated:break;default:break;}} }//一次性的watchpublic static void watchOnce(CuratorFramework zkClient,String nodePath) throws Exception {zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);}

    NodeCache監聽當前節點變化,通過NodeCacheListener接口持續監聽節點的變化來實現

    //持續監聽的watchpublic static void watchForeverByNodeCache(CuratorFramework zkClient,String nodePath) throws Exception {final NodeCache nodeCache=new NodeCache(zkClient, nodePath);//把監聽節點,轉換為nodeCachenodeCache.start(false);//默認為false 設置為true時,會自動把節點數據存放到nodeCache中;設置為false時,初始化數據為空ChildData cacheData=nodeCache.getCurrentData(); if(null==cacheData) {System.out.println("NodeCache節點的初始化數據為空……");}else {System.out.println("NodeCache節點的初始化數據為"+new String(cacheData.getData()));}//設置循環監聽nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData cdata=nodeCache.getCurrentData();if(null==cdata) {System.out.println("節點發生了變化,可能剛剛被刪除!");nodeCache.close();//關閉監聽}else {String data=new String(cdata.getData());String path=nodeCache.getCurrentData().getPath();System.out.println("節點路徑"+path+"數據發生了變化,最新數據為:"+data);}}});}

    PathChildrenCache只監聽子節點變化
    通過PathChildrenCacheListener接口持續監聽子節點來實現

    //持續監聽watch子節點的任何變化public static void watchForeverByPathChildrenCache(CuratorFramework zkClient,String nodePath) throws Exception {final PathChildrenCache childrenCache=new PathChildrenCache(zkClient, nodePath,true);//把監聽節點,轉換為childrenCache/*** StartMode:初始化方式* POST_INITIALIZED_EVENT: 異步初始化,初始化之后會觸發事件(會進入下面的第一個case)* NORMAL:異步初始化 (不會進入下面的第一個case)* BUILD_INITIAL_CACHE: 同步初始化(把節點數據同步緩存到Cache中)*/childrenCache.start(StartMode.NORMAL);List<ChildData> childDataList=childrenCache.getCurrentData();System.out.println("當前節點所有子節點的數據列表如下:");for (ChildData childData : childDataList) {System.out.println(new String(childData.getData()));}childrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case INITIALIZED:System.out.println("子節點初始化OK…");break;case CHILD_ADDED:System.out.println("子節點"+event.getData().getPath()+"已被成功添加,數據data="+new String(event.getData().getData()));break;case CHILD_UPDATED:System.out.println("子節點"+event.getData().getPath()+"數據發生變化,新數據data="+new String(event.getData().getData()));break;case CHILD_REMOVED:System.out.println("子節點"+event.getData().getPath()+"已被移除~");break;case CONNECTION_RECONNECTED:System.out.println("正在嘗試重新建立連接…");break;case CONNECTION_SUSPENDED:System.out.println("連接狀態被暫時停止…");break;default:break;}}});}

    TreeCache是上兩者的合體,既監聽自身,也監聽所有子節點變化
    通過TreeCacheListener接口來實現

    public static void treeCache(CuratorFramework zkClient) throws Exception {final String path = "/treeChildrenCache";final TreeCache treeCache = new TreeCache(zkClient, path);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {switch (event.getType()){case NODE_ADDED:System.out.println("節點變動觸發:NODE_ADDED:" + event.getData().getPath());break;case NODE_REMOVED:System.out.println("節點變動觸發:NODE_REMOVED:" + event.getData().getPath());break;case NODE_UPDATED:System.out.println("節點變動觸發:NODE_UPDATED:" + event.getData().getPath());break;case CONNECTION_LOST:System.out.println("節點變動觸發:CONNECTION_LOST:" + event.getData().getPath());break;case CONNECTION_RECONNECTED:System.out.println("節點變動觸發:CONNECTION_RECONNECTED:" + event.getData().getPath());break;case CONNECTION_SUSPENDED:System.out.println("節點變動觸發:CONNECTION_SUSPENDED:" + event.getData().getPath());break;case INITIALIZED:System.out.println("節點變動觸發:INITIALIZED:" + event.getData().getPath());break;default:break;}}});//據需可以繼續做一些其他的增刪改操作zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path);Thread.sleep(1000);zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");Thread.sleep(1000);zkClient.delete().forPath(path + "/c1");Thread.sleep(1000);zkClient.delete().forPath(path);Thread.sleep(1000);zkClient.close();}

    Curator Cache,是在zk3.6新版本添加的特性,Curator需5.+
    它的出現是為了替換以上3個監聽(NodeCache、PathCache、TreeCache),它通過CuratorCacheListener.builder().for**來選擇對應的監聽。最后再通過curatorCache.listenable().addListener(listener);注冊監聽。

    public static void curatorCache1(CuratorFramework zkClient) {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.build(zkClient, path);curatorCache.listenable().addListener(new CuratorCacheListener() {@Overridepublic void event(Type type, ChildData oldData, ChildData newdata) {switch (type) {case NODE_CREATED://各種判斷break;default:break;}}});}public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.builder(zkClient,path).build();//構建監聽器//新舊對照://1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );//2.path cache--> CuratorCacheListener.builder().forPathChildrenCache();//3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache();CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("節點改變了...");}}).build();//添加監聽curatorCache.listenable().addListener(listener);//開啟監聽curatorCache.start();//讓線程休眠30s(為了方便測試)Thread.sleep(1000 * 30);} package org.example.zookeeper.curator;import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.*; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; import org.testng.annotations.Test;import java.util.List;/*** @ClassName: CuratorDao* @Description:* @Author: 88578* @Date: 2022/5/1 14:17*/ public class CuratorDao {//使用curator(遞歸)添加節點//級聯創建節點(原生API不支持/后臺客戶端也不支持,但是Curator支持)public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception {zkClient.create().creatingParentContainersIfNeeded()//創建父節點,如果需要的話.withMode(CreateMode.PERSISTENT) //指定節點是臨時的,還是永久的.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定節點的操作權限.forPath(nodePath, nodeData.getBytes());System.out.println(nodePath + "節點已成功創建…");}//使用curator(遞歸)刪除節點//刪除node節點及其子節點public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception {zkClient.delete().guaranteed() //保證刪除:如果刪除失敗,那么在后端還是繼續會刪除,直到成功.deletingChildrenIfNeeded() //級聯刪除子節點//.withVersion(1)//版本號可以據需使用.forPath(nodePath);System.out.println(nodePath + "節點已刪除成功…");}//使用curator更新節點數據//更新節點data數據public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception {zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本號據需使用,默認可以不帶System.out.println(nodePath + "節點數據已修改成功…");}//使用curator查詢節點數據//查詢node節點數據public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = new Stat();byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath);System.out.println("節點" + nodePath + "的數據為" + new String(data));System.out.println("節點的版本號為:" + stat.getVersion());}//使用curator查詢節點的子節點//打印node子節點public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception {List<String> childNodes = zkClient.getChildren().forPath(parentNodePath);System.out.println("開始打印子節點");for (String str : childNodes) {System.out.println(str);}}//使用curator判斷節點是否存在//判斷node節點是否存在public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = zkClient.checkExists().forPath(nodePath);System.out.println(null == stat ? "節點不存在" : "節點存在");}/**************使用Curator高級API特性之Cache緩存監控節點變化*************///一次性的watchpublic static void watchOnce(CuratorFramework zkClient, String nodePath) throws Exception {zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);}//NodeCache監聽當前節點變化//通過NodeCacheListener接口持續監聽節點的變化來實現//持續監聽的watchpublic static void watchForeverByNodeCache(CuratorFramework zkClient, String nodePath) throws Exception {final NodeCache nodeCache = new NodeCache(zkClient, nodePath);//把監聽節點,轉換為nodeCachenodeCache.start(false);//默認為false 設置為true時,會自動把節點數據存放到nodeCache中;設置為false時,初始化數據為空ChildData cacheData = nodeCache.getCurrentData();if (null == cacheData) {System.out.println("NodeCache節點的初始化數據為空……");} else {System.out.println("NodeCache節點的初始化數據為" + new String(cacheData.getData()));}//設置循環監聽nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData cdata = nodeCache.getCurrentData();if (null == cdata) {System.out.println("節點發生了變化,可能剛剛被刪除!");nodeCache.close();//關閉監聽} else {String data = new String(cdata.getData());String path = nodeCache.getCurrentData().getPath();System.out.println("節點路徑" + path + "數據發生了變化,最新數據為:" + data);}}});}//PathChildrenCache只監聽子節點變化//通過PathChildrenCacheListener接口持續監聽子節點來實現//持續監聽watch子節點的任何變化public static void watchForeverByPathChildrenCache(CuratorFramework zkClient, String nodePath) throws Exception {final PathChildrenCache childrenCache = new PathChildrenCache(zkClient, nodePath, true);//把監聽節點,轉換為childrenCache/*** StartMode:初始化方式* POST_INITIALIZED_EVENT: 異步初始化,初始化之后會觸發事件(會進入下面的第一個case)* NORMAL:異步初始化 (不會進入下面的第一個case)* BUILD_INITIAL_CACHE: 同步初始化(把節點數據同步緩存到Cache中)*/childrenCache.start(PathChildrenCache.StartMode.NORMAL);List<ChildData> childDataList = childrenCache.getCurrentData();System.out.println("當前節點所有子節點的數據列表如下:");for (ChildData childData : childDataList) {System.out.println(new String(childData.getData()));}childrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case INITIALIZED:System.out.println("子節點初始化OK…");break;case CHILD_ADDED:System.out.println("子節點" + event.getData().getPath() + "已被成功添加,數據data=" + new String(event.getData().getData()));break;case CHILD_UPDATED:System.out.println("子節點" + event.getData().getPath() + "數據發生變化,新數據data=" + new String(event.getData().getData()));break;case CHILD_REMOVED:System.out.println("子節點" + event.getData().getPath() + "已被移除~");break;case CONNECTION_RECONNECTED:System.out.println("正在嘗試重新建立連接…");break;case CONNECTION_SUSPENDED:System.out.println("連接狀態被暫時停止…");break;default:break;}}});}//TreeCache是上兩者的合體,既監聽自身,也監聽所有子節點變化//通過TreeCacheListener接口來實現public static void treeCache(CuratorFramework zkClient, String nodePath) throws Exception { // final String path = "/treeChildrenCache";final TreeCache treeCache = new TreeCache(zkClient, nodePath);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {switch (event.getType()) {case NODE_ADDED:System.out.println("節點變動觸發:NODE_ADDED:" + event.getData().getPath());break;case NODE_REMOVED:System.out.println("節點變動觸發:NODE_REMOVED:" + event.getData().getPath());break;case NODE_UPDATED:System.out.println("節點變動觸發:NODE_UPDATED:" + event.getData().getPath());break;case CONNECTION_LOST:System.out.println("節點變動觸發:CONNECTION_LOST:" + event.getData().getPath());break;case CONNECTION_RECONNECTED:System.out.println("節點變動觸發:CONNECTION_RECONNECTED:" + event.getData().getPath());break;case CONNECTION_SUSPENDED:System.out.println("節點變動觸發:CONNECTION_SUSPENDED:" + event.getData().getPath());break;case INITIALIZED:System.out.println("節點變動觸發:INITIALIZED:" + event.getData().getPath());break;default:break;}}});//據需可以繼續做一些其他的增刪改操作zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);Thread.sleep(1000);zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath + "/c1");Thread.sleep(1000);zkClient.delete().forPath(nodePath + "/c1");Thread.sleep(1000);zkClient.delete().forPath(nodePath);Thread.sleep(1000);zkClient.close();}/*Curator Cache,是在zk3.6新版本添加的特性,Curator需5.*+它的出現是為了替換以上3個監聽(NodeCache、PathCache、TreeCache),它通過CuratorCacheListener.builder().for***來選擇對應的監聽。最后再通過curatorCache.listenable().addListener(listener);注冊監聽。*/public static void curatorCache1(CuratorFramework zkClient) {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.build(zkClient, path);curatorCache.listenable().addListener(new CuratorCacheListener() {@Overridepublic void event(Type type, ChildData oldData, ChildData newdata) {switch (type) {case NODE_CREATED://各種判斷break;default:break;}}});}public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.builder(zkClient, path).build();//構建監聽器//新舊對照://1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );//2.path cache--> CuratorCacheListener.builder().forPathChildrenCache();//3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache();CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("節點改變了...");}}).build();//添加監聽curatorCache.listenable().addListener(listener);//開啟監聽curatorCache.start();//讓線程休眠30s(為了方便測試)Thread.sleep(1000 * 30);}@Testpublic void test() throws Exception {ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();//new的同時,zk也被啟動CuratorFramework zkClient = zkUtil.zkClient; // CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui"); // CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test"); // CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi"); // CuratorDao.getNodeData(zkClient,"/xiaosi/test"); // CuratorDao.printChildNodes(zkClient, "/xiaosi");CuratorDao.checkNodeExists(zkClient, "/xiaosi");}public static void main(String[] args) throws Exception {ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();//new的同時,zk也被啟動CuratorFramework zkClient = zkUtil.zkClient; // CuratorDao.watchOnce(zkClient, "/xiaosi/test"); // CuratorDao.watchForeverByNodeCache(zkClient, "/xiaosi/test"); // CuratorDao.watchForeverByPathChildrenCache(zkClient, "/xiaosi/test");CuratorDao.treeCache(zkClient, "/xiaosi/test4");CuratorDao dao = new CuratorDao();String nodePath = "/super/succ";dao.createNodes(zkClient, nodePath, "super");//創建節點 // dao.updateNodeData(zkClient, nodePath, "hello");//更新節點數據 // dao.deleteNodeWithChild(zkClient, nodePath); // dao.getNodeData(zkClient, nodePath); // dao.printChildNodes(zkClient, nodePath); // dao.checkNodeExists(zkClient, nodePath); // dao.watchOnce(zkClient, nodePath); // dao.watchForeverByNodeCache(zkClient, nodePath); // dao.watchForeverByPathChildrenCache(zkClient, nodePath);Thread.sleep(300000); //延遲sleep時間,便于后才修改節點,看前臺是否會繼續觸發watchcto.closeZKClient();} }

    5.4 使用Curator創建/驗證ACL(訪問權限列表)

    5.4.1 連通Zk時,就指定登錄權限

    //本類代碼,只涉及ACL操作 public class CuratorAcl {public CuratorFramework client = null;public static final String workspace="workspace";public static final String zkServerPath = "192.168.31.216:2181";public CuratorAcl() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().authorization("digest", "mayun:mayun".getBytes())//通常情況下,登錄賬號、密碼可以通過構造參數傳入,暫時固定,據需修改.connectString(zkServerPath).sessionTimeoutMs(20000).retryPolicy(retryPolicy).namespace(workspace).build();client.start();}public void closeZKClient() {if (client != null) {this.client.close();}} }

    5.4.2寫一個把明文的賬號密碼轉換為加密后的密文的工具類

    //把明文的賬號密碼轉換為加密后的密文 public class AclUtils {public static String getDigestUserPwd(String loginId_Username_Passwd) {String digest = "";try {digest = DigestAuthenticationProvider.generateDigest(loginId_Username_Passwd);} catch (NoSuchAlgorithmException e) {e.printStackTrace();}return digest;}public static void main(String[] args) throws IOException, InterruptedException, KeeperException, Exception {String id = "mayun:mayun";String idDigested = getDigestUserPwd(id);System.out.println(idDigested); // mayun:KThXmEntEPZyHsQk7tbP5ZzEevk=} }

    5.4.3使用自定義工具類AclUtils,一次性給多個用戶賦Acl權限

    public static List<ACL> getAcls() throws NoSuchAlgorithmException{List<ACL> acls=new ArrayList<ACL>();Id mayun =new Id("digest", AclUtils.getDigestUserPwd("mayun:mayun"));Id lilei =new Id("digest", AclUtils.getDigestUserPwd("lilei:lilei"));acls.add(new ACL(Perms.ALL, mayun));//給mayun一次性賦值所有權限acls.add(new ACL(Perms.READ, lilei));acls.add(new ACL(Perms.DELETE | Perms.CREATE, lilei));//給lilei分兩次賦權限(目的:看不同的賦權方式)return acls;}

    5.4.4級聯創建節點,并賦予節點操作權限

    public static void createNodesCascade(CuratorAcl cto,String nodePath,String nodeData,List<ACL> acls) throws Exception {String result=cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls, true)//給節點賦權限.forPath(nodePath, nodeData.getBytes());System.out.println("創建成功,result="+result); }

    5.4.5讀取節點數據

    public void getNodeData(CuratorAcl cto,String nodePath) throws Exception {Stat stat = new Stat();byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);if(null!=stat) {System.out.println("節點" + nodePath + "的數據為: " + new String(data));System.out.println("該節點的版本號為: " + stat.getVersion());}}

    5.4.6修改具有ACL權限節點的data數據

    public void modNodeDataWhichWithAcl(CuratorAcl cto,String nodePath,String nodeNewData) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("節點修改后的數據為:"+nodeNewData);cto.client.setData().forPath(nodePath, nodeNewData.getBytes());System.out.println("修改成功");}

    5.4.7兩種方法判斷node節點是否存(優先使用第一種)

    public void checkNodeExists(CuratorAcl cto,String nodePath) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("-----------=================-------------");//判斷節點是否存在,方法一(路徑前面會自動添加workspace)Stat stat=cto.client.checkExists().forPath(nodePath);System.out.println("======="+stat==null?"不存在":"存在");//判斷節點是否存在,方法二(路徑前面需手動添加workspace)Stat stat2 = cto.client.getZookeeperClient().getZooKeeper().exists("/"+workspace+nodePath, false);System.out.println("======="+stat2==null?"不存在":"存在");}

    ACL權限的main方法測試
    通過java代碼給某個節點添加ACL權限后,后臺登陸zk客戶端時,是無法直接操作該節點被ACL控制的權限的操作的,要想操作具有ACL權限的節點,方法只有兩個。
    1、知道該節點輸入用戶都有哪些,用這些用戶的賬號密碼登錄
    2、使用超級用戶登錄
    #getAcl /succ/testDigest 查看都有哪些用戶對該節點有操作權限
    #addauth digest succ:succ 登錄

    public static void main(String[] args) throws Exception {CuratorAcl cto = new CuratorAcl();boolean isZkCuratorStarted = cto.client.isStarted();System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連接成功" : "已關閉"));String nodePath1 = "/acl/tom/bin";String nodePath2 = "/acl/father/child/sub"; // cto.createNodesCascade(cto, nodePath1, "aclTest", getAcls());//首次創建,報錯,只能創建父節點,子節點無法創建 // cto.client.setACL().withACL(getAcls()).forPath("/curatorNode");//給節點創建權限 // cto.getNodeData(cto, "/super"); // cto.getNodeData(cto, "/acl");cto.checkNodeExists(cto, nodePath2);cto.closeZKClient();boolean isZkCuratorStarted2 = cto.client.isStarted();System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連接成功" : "已關閉"));}

    7 分布式鎖

    Curator的5種分布式鎖及其對應的核心類:

    1.重入式排它鎖 Shared Reentrant Lock,實現類:InterProcessMutex

    2.不可重入排它鎖 Shared Lock ,實現類:InterProcessSemaphoreMutex

    3.可重入讀寫鎖 Shared Reentrant Read Write Lock,實現類: InterProcessReadWriteLock 、InterProcessLock

    4.多鎖對象容器(多共享鎖) Multi Shared Lock,將多個鎖作為單個實體管理的容器,實現類:InterProcessMultiLock、InterProcessLock

    5.共享信號鎖Shared Semaphore ,實現類:InterProcessSemaphoreV2

    跨 JVM 工作的計數信號量。使用相同鎖路徑的所有 JVM 中的所有進程將實現進程間有限的租用集。此外,這個信號量大多是“公平的”——每個用戶將按照請求的順序獲得租用(從 ZK 的角度來看)。

    有兩種模式可用于確定信號量的最大租用。在第一種模式中,最大租用是由給定路徑的用戶維護的約定。在第二種模式中,SharedCountReader 用作給定路徑的信號量的方法,以確定最大租用。

    7.1.重入式排它鎖InterProcessMutex

    public InterProcessMutex(CuratorFramework client, String path)
    獲取/釋放鎖的API

    public void acquire() throws Exception;//獲取鎖,獲取不到鎖一直阻塞,zk連接中斷則拋異常
    public boolean acquire(long time, TimeUnit unit) throws Exception;//獲取鎖,超過該時間后,直接返回false,zk連接中斷則拋異常
    public void release() throws Exception;//釋放鎖
    通過release()方法釋放鎖。InterProcessMutex 實例可以重用。Revoking ZooKeeper recipes wiki定義了可協商的撤銷機制。為了撤銷mutex, 調用下面的方法

    /**

    • 將鎖設為可撤銷的. 當別的進程或線程想讓你釋放鎖時Listener會被調用。
    • Parameters:
    • listener - the listener
      */
      public void makeRevocable(RevocationListener listener)

    7.2.不可重入排它鎖InterProcessSemaphoreMutex

    public InterProcessSemaphoreMutex(CuratorFramework client, String path)
    使用InterProcessSemaphoreMutex,調用方法類似,區別在于該鎖是不可重入的,在同一個線程中不可重入

    7.3.可重入讀寫鎖InterProcessReadWriteLock 、InterProcessLock

    一個讀寫鎖管理一對相關的鎖。一個負責讀操作,另外一個負責寫操作。讀操作在寫鎖沒被使用時可同時由多個進程使用,而寫鎖使用時不允許讀 (阻塞)。此鎖是可重入的。一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進入寫鎖。這也意味著寫鎖可以降級成讀鎖, 比如請求寫鎖 —>讀鎖 —->釋放寫鎖。從讀鎖升級成寫鎖是不成的。

    7.4.多鎖對象容器(多共享鎖) ,將多個鎖作為單個實體管理,InterProcessMultiLock、InterProcessLock

    Multi Shared Lock是一個鎖的容器。當調用acquire, 所有的鎖都會被acquire(上鎖),如果請求失敗,所有的鎖都會被release (釋放鎖)。同樣調用release時所有的鎖都被release(失敗被忽略)。基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。主要涉及兩個類:InterProcessMultiLock、InterProcessLock

    它的構造函數需要包含的鎖的集合,或者一組ZooKeeper的path。

    public InterProcessMultiLock(List locks)
    public InterProcessMultiLock(CuratorFramework client, List paths)

    7.5.代碼

    public class ZkLock {final static Logger log = LoggerFactory.getLogger(ZkLock.class);public CuratorFramework zkClient = null; // zk的客戶端工具Curator(在本類通過new實例化的是,自動start)private static final int BASE_SLEEP_TIME_MS = 1000; // 連接失敗后,再次重試的間隔時間 單位:毫秒private static final int MAX_RETRY_TIMES = 10; // 定義失敗重試次數private static final int SESSION_TIME_OUT = 1000000; // 會話存活時間,根據業務靈活指定 單位:毫秒private static final String ZK_SERVER_IP_PORT = "localhost:2181";// Zookeeper服務所在的IP和客戶端端口private static final String NAMESPACE = "workspace";// 指定后,默認操作的所有的節點都會在該工作空間下進行static int j = 10;//初始化zk客戶端public ZkLock() {// 重試策略:初試時間為1s 重試10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRY_TIMES);// 通過工廠建立連接zkClient = CuratorFrameworkFactory.builder().connectString(ZK_SERVER_IP_PORT) // 連接地址.sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy)// 重試策略.build();zkClient.start();}public static void lockTest(CuratorFramework zkClient) throws InterruptedException {// 使用分布式鎖,所有系統同時監聽同一個節點,達到分布式鎖的目的final InterProcessMutex lock = new InterProcessMutex(zkClient, "/test");final CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 10; i++) {//啟動10個線程new Thread(new Runnable() {@Overridepublic void run() {try {countDownLatch.await();// 線程等待一起執行lock.acquire();// 分布式鎖,數據同步// 處理業務j--;System.out.println(j);} catch (Exception e) {e.printStackTrace();} finally {try {// 釋放鎖lock.release();} catch (Exception e) {e.printStackTrace();}}}}, "t" + i).start();}Thread.sleep(1000);countDownLatch.countDown();// 模擬十個線程一起并發.指定一起執行}public static void main(String[] args) throws InterruptedException {ZkLock zkl = new ZkLock();ZkLock.lockTest(zkl.zkClient);} }

    8.分布式計數器

    利用Zookeeper可以實現一個集群共享的計數器。只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器:DistributedAtomicInteger,DistributedAtomicLong。這個兩個除了計數范圍(int、long)不同外,沒有任何不同。操作也非常簡單,跟AtomicInteger大同小異。

    increment() //加1
    decrement() //減1
    compareAndSet(Integer expectedValue, Integer newValue) //cas操作
    get() //獲取當前值
    add():增加特定的值
    subtract(): 減去特定的值
    trySet(): 嘗試設置計數值
    使用的時候,必須檢查返回結果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。

    public static void count(CuratorFramework zkClient) throws Exception {//分布式計數器DistributedAtomicInteger counter=new DistributedAtomicInteger(zkClient,"/super",new RetryNTimes(3,100));//初始化counter.forceSet(0);AtomicValue<Integer> value = counter.increment();//原子自增System.out.println("原值為"+value.preValue());System.out.println("更改后的值為"+value.postValue());System.out.println("狀態"+value.succeeded());}public static void main(String[] args) throws Exception {ZkLock zkl=new ZkLock();//ZkLock.lockTest(zkl.zkClient);ZkLock.count(zkl.zkClient);}

    另外Curator還有一些高端的用法:分布式屏障—Barrier、Double-barrier,分布式隊列DistributedQueueDistributed Queue
    https://blog.csdn.net/succing/article/details/121779721

    https://blog.csdn.net/succing/article/details/121793494

    https://blog.csdn.net/succing/article/details/121844550
    https://blog.csdn.net/succing/article/details/121802687

    總結

    以上是生活随笔為你收集整理的大数据之Zookeeper的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。