大数据之Zookeeper
文章目錄
- 1. Zookeeper 入門
- 1.1 概述
- 1.2 特點
- 1.3 數據結構
- 1.4 應用場景
- 2. Zookeeper 安裝
- 2.1 下載地址
- 2.2 本地模式安裝部署
- 2.3 分布式安裝部署
- 2.4 配置參數解讀
- 3. Zookeeper 內部原理
- 3.1 選舉機制
- 3.2 節點類型
- 3.3 Stat 結構體
- 3.4 監聽器原理
- 3.5 寫數據流程
- 4. Zookeeper 實戰
- 4.1 客戶端命令行操作
- 4.2 API 操作
- 4.3.1 IDEA 環境搭建
- 4.3.2 創建 ZooKeeper 客戶端
- 4.3.4 獲取子節點并監聽節點變化
- 4.3.5 判斷 Znode 是否存在
- 4.3 監聽服務器節點動態上下線案例
- 5 zookeeper框架
- 5.1 org.apache.zookeeper
- 5.2 zkclient
- 5.2.1 簡介
- 5.2.2 Maven依賴
- 5.2.3 ZkClient 的設計
- 5.2.4 重要處理流程說明
- 5.2.5 客戶端處理變更(Watcher通知)
- 5.2.6 序列化處理
- 5.2.7 ZkClient如何解決使用ZooKeeper客戶端遇到的問題的呢?
- 5.2.8 API介紹
- 5.2.9 demo
- 5.3 Curator
- 5.3.1 簡介
- 5.3.2 版本問題
- 5.3.3 CuratorFramework
- 5.3.4 curator-recipes
- 5.3.5 知識點
- 5.3.6 Maven依賴
- 5.3.7 api
- 5.3.8 使用Curator高級API特性之Cache緩存監控節點變化
- 5.4 使用Curator創建/驗證ACL(訪問權限列表)
- 5.4.1 連通Zk時,就指定登錄權限
- 5.4.2寫一個把明文的賬號密碼轉換為加密后的密文的工具類
- 5.4.3使用自定義工具類AclUtils,一次性給多個用戶賦Acl權限
- 5.4.4級聯創建節點,并賦予節點操作權限
- 5.4.5讀取節點數據
- 5.4.6修改具有ACL權限節點的data數據
- 5.4.7兩種方法判斷node節點是否存(優先使用第一種)
- 7 分布式鎖
- 7.1.重入式排它鎖InterProcessMutex
- 7.2.不可重入排它鎖InterProcessSemaphoreMutex
- 7.3.可重入讀寫鎖InterProcessReadWriteLock 、InterProcessLock
- 7.4.多鎖對象容器(多共享鎖) ,將多個鎖作為單個實體管理,InterProcessMultiLock、InterProcessLock
- 7.5.代碼
- 8.分布式計數器
1. Zookeeper 入門
1.1 概述
Zookeeper 是一個開源的分布式的,為分布式應用提供協調服務的 Apache 項目。
??Zookeeper 從設計模式角度來理解:是一個基于觀案者模式設計的分布式服務管理框架,它負責存儲和管理大家都關心的數據,然后接受觀察者的注冊,一旦這些數據的狀態發生變化,Zookeeper 就將負責通知已經在 Zookeeper 上注冊的那些觀察者做出相應的反應。
??
1.2 特點
1)Zookeeper:一個領導者(Leader) ,多個跟隨者(Follower)組成的集群。
2)集群中只要有半數以上節點存活,Zookeeper 集群就能正常服務。
3)全局數據一致:每個 Server 保存一份相同的數據副本,Client 無論連接到哪個 Server,數據都是一致的。
4)更新請求順序進行,來自同一個 Client 的更新請求按其發送順序依次執行。
5)數據更新原子性,一次數據更新要么成功,要么失敗。
6)實時性,在一定時間范圍內,Client 能讀到最新數據。
1.3 數據結構
ZooKeeper 數據模型的結構與 Unix 文件系統很類似,整體上可以看作是一棵樹,每個節點稱做一個 ZNode。每一個 ZNode 默認能夠存儲 1 MB 的數據,每個 ZNode 都可以通過其路徑唯一標識。
1.4 應用場景
提供的服務包括:統一命名服務、統一配置管理、統一集群管理、服務器節點動態上下線、軟負載均衡等。
統一命名服務
在分布式環境下,經常需要對應用/服務進行統一命名 ,便于識別。例如:IP 不容易記住,而域名容易記住。
統一配置管理
(1)分布式環境下,配置文件同步非常常見。
??① 一般要求一個集群中,所有節點的配置信息是一致的,比如 Kafka 集群。
??② 對配置文件修改后,希望能夠快速同步到各個節點上。
(2)配置管理可交由 ZooKeeper 實現。
??① 可將配置信息寫入 ZooKeeper 上的一個 Znode 。
??② 各個客戶端服務器監聽這個 Znode。
??③ 一旦 Znode 中的數據被修改,ZooKeeper 將通知各個客戶端服務器。
統一集群管理
(1)分布式環境中,實時掌握每個節點的狀態是必要的。
??可根據節點實時狀態做出一些調整。
(2)ZooKeeper 可以實現實時監控節點狀態變化
??① 可將節點信息寫入Z ooKeeper 上的一個 ZNode。
??② 監聽這個 ZNode 可獲取它的實時狀態變化。
服務器動態上下線
軟負載均衡
在 Zookeeper 中記錄每臺服務器的訪問數,讓訪問數最少的服務器去處理最新的客戶端請求。
2. Zookeeper 安裝
2.1 下載地址
zookeeper 官網
2.2 本地模式安裝部署
準備工作
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local/ mv apache-zookeeper-3.5.6-bin/ zookeeper mv zoo_sample.cfg zoo.cfg mkdir -p /usr/local/zookeeper/datavim zoo.cfg dataDir=/usr/local/zookeeper/datavim /etc/profile
在配置文件中添加以下內容
#ZOOKEEPER
export ZOOKEEPER_HOME=/hadoop/zookeeper-3.5.6
export PATH=PATH:PATH:PATH:ZOOKEEPER_HOME/bin
source /etc/profile
啟動 Zookeeper
zkServer.sh start
啟動客戶端
zkCli.sh
退出客戶端
quit
停止 Zookeeper
zkServer.sh stop
2.3 分布式安裝部署
集群規劃
在 master、slave1 和 slave2 三個節點上部署 Zookeeper。
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local
mv apache-zookeeper-3.5.6-bin/ zookeeper
同步 /usr/local/zookeeper 目錄內容到 slave1、slave2
xsync zookeeper/
配置服務器編號
① 在 /usr/local/zookeeper/ 這個目錄下創建 zkData
mkdir data
② /usr/local/zookeeper/data 目錄下創建一個 myid 的文件
touch myid
③ 編輯 myid 文件
vim myid
在文件中添加與 server 對應的編號:
0
④ 分發到其他機器上
xsync myid
并分別在 slave1、slave2 上修改 myid 文件中內容為 1、2
配置 zoo.cfg 文件
① 將 /usr/local/zookeeper/conf 這個路徑下的 zoo_sample.cfg 修改為 zoo.cfg
mv zoo_sample.cfg zoo.cfg
② 打開 zoo.cfg 文件,修改 dataDir 路徑
dataDir=/usr/local/zookeeper/data
增加如下配置
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888
同步 zoo.cfg 配置文件
xsync zoo.cfg
修改環境變量
① 打開配置文件
vim /etc/profile
② 在配置文件中添加以下內容
#ZOOKEEPER
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=PATH:PATH:PATH:ZOOKEEPER_HOME/bin
③ 同步配置文件
xsync /etc/profile
④ 使配置文件生效(三臺機器)
source /etc/profile
集群操作
① 三臺機器分別啟動 Zookeeper
zkServer.sh start
② 三臺機器分別關閉 Zookeeper
zkServer.sh stop
編寫 Zookeeper 的群起群關腳本
① 在 /usr/local/bin 目錄下創建 zk 文件
vim zk.sh
修改腳本 zk 具有執行權限
chmod 777 zk.sh
調用腳本形式:zk start 或 zk stop
2.4 配置參數解讀
Zookeeper 中的配置文件 zoo.cfg 中參數含義解讀如下:
tickTime =2000:通信心跳數,Zookeeper 服務器與客戶端心跳時間,單位毫秒
Zookeeper 使用的基本時間,服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個tickTime 時間就會發送一個心跳,時間單位為毫秒。它用于心跳機制,并且設置最小的 session 超時時間為兩倍心跳時間。(session 的最小超時時間是 2*tickTime)
initLimit =10:LF 初始通信時限
集群中的 Follower 跟隨者服務器與 Leader 領導者服務器之間初始連接時能容忍的最多心跳數(tickTime的數量),用它來限定集群中的 Zookeeper 服務器連接到 Leader 的時限。
syncLimit =5:LF 同步通信時限
集群中 Leader 與 Follower 之間的最大響應時間單位,假如響應超過 syncLimit * tickTime,Leader 認為 Follwer 死掉,從服務器列表中刪除 Follwer。
dataDir:數據文件目錄+數據持久化路徑
主要用于保存 Zookeeper 中的數據。
clientPort =2181:客戶端連接端口
監聽客戶端連接的端口。
server.A=B:C:D
A 是一個數字,表示這個是第幾號服務器;集群模式下配置一個文件 myid,這個文件在 dataDir 目錄下,這個文件里面有一個數據就是 A 的值,Zookeeper 啟動時讀取此文件,拿到里面的數據與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個server。
B 是這個服務器的 ip 地址;
C 是這個服務器與集群中的 Leader 服務器交換信息的端口;
D 是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。
3. Zookeeper 內部原理
3.1 選舉機制
半數機制
集群中半數以上機器存活,集群可用。所以 Zookeeper 適合安裝奇數臺服務器。
Zookeeper 雖然在配置文件中并沒有指定 Master 和 Slave。但是,Zookeeper 工作時,是有一個節點為 Leader,其他則為 Follower,Leader 是通過內部的選舉機制臨時產生的。
選舉過程例子
假設有五臺服務器組成的 Zookeeper 集群,它們的 id 從1-5,同時它們都是最新啟動的,也就是沒有歷史數據,在存放數據量這一點上,都是一樣的。假設這些服務器依序啟動。
① 服務器 1 啟動,此時只有它一臺服務器啟動了,它發出去的報文沒有任何響應,所以它的選舉狀態一直是 LOOKING 狀態。
② 服務器 2 啟動,它與最開始啟動的服務器 1 進行通信,互相交換自己的選舉結果,由于兩者都沒有歷史數據,所以 id 值較大的服務器 2 勝出,但是由于沒有達到超過半數以上的服務器都同意選舉它(這個例子中的半數以上是 3),所以服務器 1、2 還是繼續保持 LOOKING 狀態。
③ 服務器 3 啟動,根據前面的理論分析,服務器 3 成為服務器 1、2、3 中的老大,而與上面不同的是,此時有三臺服務器選舉了它,所以它成為了這次選舉的 Leader。
④ 服務器 4 啟動,根據前面的分析,理論上服務器4應該是服務器 1、2、3、4 中最大的,但是由于前面已經有半數以上的服務器選舉了服務器 3,所以它只能接收當小弟的命了。
⑤ 服務器 5 啟動,同 4 一樣當小弟。
3.2 節點類型
持久(Persistent)
客戶端和服務器端斷開連接后,創建的節點不刪除
短暫(Ephemeral)
客戶端和服務器端斷開連接后,創建的節點自己刪除
節點類型
① 持久化目錄節點
客戶端與 Zookeeper 斷開連接后,該節點依舊存在。
② 持久化順序編號目錄節點
客戶端與 Zookeeper 斷開連接后,該節點依舊存在,只是 Zookeeper 給該節點名稱進行順序編號
③ 臨時目錄節點
客戶端與 Zookeeper 斷開連接后,該節點被刪除
④ 臨時順序編號目錄節點
客戶端與 Zookeeper 斷開連接后,該節點被刪除,只是 Zookeeper 給該節點名稱進行順序編號。
說明: 創建 znode 時設置順序標識,znode 名稱后會附加一個值,順序號是一個單調遞增的計數器,由父節點維護。
注意: 在分布式系統中,順序號可以被用于為所有的事件進行全局排序,這樣客戶端可以通過順序號推斷事件的順序。
3.3 Stat 結構體
czxid: 創建節點的事務 zxid
每次修改 ZooKeeper 狀態都會收到一個 zxid 形式的時間戳,也就是 ZooKeepe r事務 ID。
事務 ID 是 ZooKeeper 中所有修改總的次序。每個修改都有唯一的 zxid,若 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前發生。
ctime: znode 被創建的毫秒數(從 1970 年開始)
mzxid: znode 最后更新的事務 zxid
mtime: znode 最后修改的毫秒數(從 1970 年開始)
pZxid: znode 最后更新的子節點 zxid
cversion : znode 子節點變化號,znode 子節點修改次數
dataversion: znode 數據變化號
aclVersion: znode 訪問控制列表的變化號
ephemeralOwner: 如果是臨時節點,這個是 znode 擁有者的 session id。如果不是臨時節點則是 0。
dataLength: znode 的數據長度
numChildren: znode 子節點數量
3.4 監聽器原理
監聽原理詳解:
① 首先要有一個 main() 線程
② 在 main 線程中創建 Zokeeper 客戶端,這時就會創建兩個線程,一個負責網絡連接通信(connet),一個負責監聽(listener) 。
③ 通過 connect 線程將注冊的監聽事件發送給 Zookeeper。
④ 在 Zookeeper 的注冊監聽器列表中將注冊的監聽事件添加到列表中。
⑤ Zookeeper 監聽到有數據或路徑變化,就會將這個消息發送給 listener 線程。
⑥ listener 線程內部調用了 process() 方法。
常見的監聽
① 監聽節點數據的變化
get -w path
② 監聽子節點增減的變化
ls -w path
3.5 寫數據流程
4. Zookeeper 實戰
4.1 客戶端命令行操作
啟動客戶端
zkCli.sh
顯示所有操作命令
help
查看當前 znode 中所包含的內容
ls /
ls2 /
查看當前節點詳細數據
ls -s /
分別創建 2 個普通節點
create /animals “dog”
create /animals/small “ant”
獲得節點的值
get /animals
get /animals/small
創建短暫節點
create -e /animals/big “elephant”
創建帶序號的節點
create -s /animals/middle “hourse”
修改節點數據值
set /animals/small “bug”
節點的值變化監聽
① 在 slave1 主機上注冊監聽 /animals 節點數據變化
get -w /animals
② 在 slave2 主機上修改 /animals 節點的數據
set /animals “cat”
③ 觀察 slave1 主機收到子節點變化的監聽
節點的子節點變化監聽(路徑變化)
① 在 slave1 主機上注冊監聽 /animals 節點的子節點變化
ls -w /animals
② 在 slave2 主機 /animals 節點上創建子節點
create /animals/mini “fly”
③ 觀察 slave1 主機收到子節點變化的監聽
刪除節點
delete /animals/big
遞歸刪除節點
deleteall /animals/mini
查看節點狀態
stat /animals
4.2 API 操作
4.3.1 IDEA 環境搭建
創建一個 Maven 工程
在 pom 文件中添加依賴
在項目的 src/main/resources 目錄下,新建一個文件,命名為 “log4j.properties”,在文件中填入:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4.3.2 創建 ZooKeeper 客戶端
@SpringBootTest
public class ZookeeperTest {
}
4.3.3 創建子節點
先將上面的 init() 方法前面的注解 @Test 改為 @BeforeAll
// 創建子節點
@SpringBootTest public class ZookeeperTest {private static String connectString = "localhost:2181";private static int sessionTimeout = 2000;private static ZooKeeper zkClient;@BeforeAllpublic static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {}});}@Testpublic void createNode() throws Exception { // 參數1:要創建的節點的路徑; 參數2:節點數據 ; 參數3:節點權限 ;參數4:節點的類型String path = zkClient.create("/demo1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println(path);} }4.3.4 獲取子節點并監聽節點變化
// 獲取子節點并監聽節點變化 @SpringBootTest public class WatchTest {private static String connectString = "localhost:2181";private static int sessionTimeout = 2000;private static ZooKeeper zkClient;@Testpublic void getChildrenAndWatch() throws Exception {List<String> children = zkClient.getChildren("/", true);for (String child : children) {System.out.println(child);}// 延時阻塞Thread.sleep(Long.MAX_VALUE);}@BeforeAllpublic static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {List<String> children = null;try {children = zkClient.getChildren("/", true);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}for (String child : children) {System.out.println(child);}}});} }4.3.5 判斷 Znode 是否存在
// 判斷znode是否存在
@Test
public void exist() throws Exception {
Stat stat = zkClient.exists(“/animals”, false);
System.out.println(stat == null ? “not exist” : “exist”);
}
4.3 監聽服務器節點動態上下線案例
需求
某分布式系統中,主節點可以有多臺,可以動態上下線,任意一臺客戶端都能實時感知到主節點服務器的上下線。
需求分析
代碼實現
① 先在集群上創建 /servers 節點
create /servers “servers”
② 服務器端向 Zookeeper 注冊代碼
package zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient;public static void main(String[] args) throws Exception {args = new String[]{"slave1"};DistributeServer server = new DistributeServer();// 1.連接zookeeper集群server.getConnect();// 2.注冊節點server.register(args[0]);// 3.業務邏輯處理server.business(); }private void getConnect() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {}}); }private void register(String hostname) throws KeeperException, InterruptedException {String path = zkClient.create("/servers/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + " is online"); }private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE); }}
③ 客戶端代碼
package zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient;public static void main(String[] args) throws Exception {DistributeClient client = new DistributeClient();// 1.連接zookeeper集群client.getConnect();// 2.注冊監聽client.getChildren();// 3.業務邏輯處理client.business(); }private void getConnect() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {try {getChildren();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}); }private void getChildren() throws KeeperException, InterruptedException {List<String> children = zkClient.getChildren("/servers", true);// 存儲服務器節點主機名稱集合ArrayList<String> hosts = new ArrayList<String>();for (String child : children) {byte[] data = zkClient.getData("/servers/" + child, false, null);hosts.add(new String(data));}System.out.println(hosts); }private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE); }}
5 zookeeper框架
5.1 org.apache.zookeeper
5.2 zkclient
5.2.1 簡介
ZkClient 是由 Datameer 的工程師開發的開源客戶端,對 Zookeeper 的原生 API 進行了包裝,實現了超時重連、Watcher 反復注冊等功能。
在使用 ZooKeeper 的 Java 客戶端時,經常需要處理幾個問題:重復注冊 watcher、session失效重連、異常處理。
IZKConnection:是一個ZkClient與Zookeeper之間的一個適配器;在代碼里直接使用的是ZKClient,實質上還是委托了zookeeper來處理了。
在ZKClient中,根據事件類型,分為
節點事件(數據事件),對應的事件處理器是IZKDataListener;
子節點事件,對應的事件處理器是IZKChildListener;
Session事件,對應的事件處理器是IZKStatusListener;
ZkEventThread:是專門用來處理事件的線程
目前已經運用到了很多項目中,知名的有 Dubbo、Kafka、Helix。
5.2.2 Maven依賴
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version> </dependency>5.2.3 ZkClient 的設計
從上述結構上看,IZKConnection 是一個 ZkClient 與 ZooKeeper 之間的一個適配器。在代碼里直接使用的是 ZKClient,其實質還是委托了 zookeeper 來處理了。
使用 ZooKeeper 客戶端來注冊 watcher 有幾種方法: 1、創建 ZooKeeper 對象時指定默認的 Watcher,2、getData(),3、exists(),4、 getchildren。其中 getdata,exists 注冊的是某個節點的事件處理器(watcher),getchildren 注冊的是子節點的事件處理器(watcher)。而在 ZKClient 中,根據事件類型,分為了節點事件(數據事件)、子節點事件。對應的事件處理器則是 IZKDataListener 和 IZKChildListener。另外加入了 Session 相關的事件和事件處理器。
ZkEventThread 是專門用來處理事件的線程。
5.2.4 重要處理流程說明
啟動 ZKClient
在創建 ZKClient 對象時,就完成了到 ZooKeeper 服務器連接的建立。具體過程是這樣的:
啟動時,指定好 connection string,連接超時時間,序列化工具等。
創建并啟動 eventThread,用于接收事件,并調度事件監聽器 Listener 的執行。
連接到 zookeeper 服務器,同時將 ZKClient 自身作為默認的 Watcher。
為節點注冊Watcher:
ZooKeeper 的三個方法:getData、getChildren、exists.
ZKClient 都提供了相應的代理方法。就拿 exists 來看:
可以看到,是否注冊 watcher,由 hasListeners(path)來決定的。
hasListeners 就是看有沒有與該數據節點綁定的 listener。
所以,默認情況下,都會自動的為指定的 path 注冊 watcher,并且是默認的 watcher (ZKClient)。怎么才能讓 hasListeners 判定值為 true 呢,也就是怎么才能為 path 綁定 Listener 呢?
ZKClient提供了訂閱功能:
一個新建的會話,只需要在取得響應的數據節點后,調用 subscribteXxx 就可以訂閱上相應的事件了。
5.2.5 客戶端處理變更(Watcher通知)
前面已經知道,ZKClient 是默認的 Watcher,并且在為各個數據節點注冊的 Watcher 都是這個默認的 Watcher。那么該是如何將各種事件通知給相應的 Listener 呢?
處理過程大致可以概括為下面的步驟:
判斷變更類型:變更類型分為 State 變更、ChildNode 變更(創建子節點、刪除子節點、修改子節點數據)、NodeData 變更(創建指定 node,刪除節點,節點數據變更)。
取出與 path 關聯的 Listeners,并為每一個 Listener 創建一個 ZKEvent,將 ZkEvent 交給 ZkEventThread 處理。
ZkEventThread 線程,拿到 ZkEvent 后,只需要調用 ZkEvent 的 run 方法進行處理。 從這里也可以知道,具體的怎么如何調用 Listener,還要依賴于 ZkEvent 的 run()實現了。
注冊監聽 watcher:
| IZkChildListener(子節點) | ZkClient的subscribeChildChanges方法 | ZkClient 的unsubscribeChildChanges 方法 |
| IZkDataListener(數據) | ZkClient 的subscribeDataChanges 方法 | ZkClient 的 unsubscribeDataChanges 方法 |
| IZkStateListener(客戶端狀 態) | ZkClient 的 subscribeStateChanges 方 法 | ZkClient 的 unsubscribeStateChanges 方法 |
在 ZkClient 中客戶端可以通過注冊相關的事件監聽來實現對 Zookeeper 服務端時間的訂閱。
其中 ZkClient 提供的監聽事件接口有以下幾種:
其中 ZkClient 還提供了一個 unsubscribeAll 方法,來解除所有監聽。
Zookeeper 中提供的變更操作有:節點的創建、刪除,節點數據的修改:
創建操作,數據節點分為四種,ZKClient 分別為他們提供了相應的代理:
刪除節點的操作:
修改節點數據的操作:
writeDataReturnStat():寫數據并返回數據的狀態。
updateDataSerialized():修改已序列化的數據。執行過程是:先讀取數據,然后使用DataUpdater 對數據修改,最后調用 writeData 將修改后的數據發送給服務端。
5.2.6 序列化處理
ZooKeeper 中,會涉及到序列化、反序列化的操作有兩種:getData、setData。在 ZKClient 中,分別用 readData、writeData 來替代了。
對于 readData:先調用 zookeeper 的 getData,然后進行使用 ZKSerializer 進行反序列化工 作。
對于 writeData:先使用 ZKSerializer 將對象序列化后,再調用 zookeeper 的 setData。
5.2.7 ZkClient如何解決使用ZooKeeper客戶端遇到的問題的呢?
Watcher 自動重注冊:這個要是依賴于 hasListeners()的判斷,來決定是否再次注冊。如果對此有不清晰的,可以看上面的流程處理的說明。
Session 失效重連:如果發現會話過期,就先關閉已有連接,再重新建立連接。
異常處理:對比 ZooKeeper 和 ZKClient,就可以發現 ZooKeeper 的所有操作都是拋異常 的,而 ZKClient 的所有操作,都不會拋異常的。在發生異常時,它或做日志,或返回空, 或做相應的 Listener 調用。
相比于 ZooKeeper 官方客戶端,使用 ZKClient 時,只需要關注實際的 Listener 實現即可。所 以這個客戶端,還是推薦大家使用的。
https://www.cnblogs.com/jinchengll/p/12333213.html
5.2.8 API介紹
啟動ZKClient:在創建ZKClient對象時,就完成了到ZooKeeper服務器連接的建立
1、啟動時,制定好connection string,連接超時時間,序列化工具等
2、創建并啟動eventThread,用于接收事件,并調度事件監聽器Listener的執行
3、連接到Zookeeper服務器,同時將ZKClient自身作為默認的Watcher
為節點注冊Watcher
Zookeeper 原始API的三個方法:getData,getChildren、exists,ZKClient都提供了相應的代理方法,比如exists,
hasListeners是看有沒有與該數據節點綁定的listener
所以,默認情況下,都會自動的為指定的path注冊watcher,并且是默認的watcher(ZKClient),那么怎樣才能讓hasListeners值為true呢,也就是怎么才能為path綁定Listener呢?
ZKClient提供了訂閱功能,一個新建的會話,只需要在取得響應的數據節點后,調用subscribeXXX就可以訂閱上相應的事件了。
5.2.9 demo
Watcher
public class ZkClientWatcher {ZkClient zkClient;public ZkClientWatcher() {zkClient = new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);}public void createPersistent(String path, Object data) {zkClient.createPersistent(path, data);}public void writeData(String path, Object object) {zkClient.writeData(path, object);}public void delete(String path) {zkClient.delete(path);}public boolean exists(String path) {return zkClient.exists(path);}public void deleteRecursive(String path) {zkClient.deleteRecursive(path);}//對父節點添加監聽數據變化。public void subscribe(String path) {zkClient.subscribeDataChanges(path, new IZkDataListener() {@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {System.out.printf("變更的節點為:%s,數據:%s\r\n", dataPath, data);}@Overridepublic void handleDataDeleted(String dataPath) throws Exception {System.out.printf("刪除的節點為:%s\r\n", dataPath);}});}//對父節點添加監聽子節點變化。public void subscribe2(String path) {zkClient.subscribeChildChanges(path, new IZkChildListener() {@Overridepublic void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {System.out.println("父節點: " + parentPath + ",子節點:" + currentChilds + "\r\n");}});}//客戶端狀態public void subscribe3(String path) {zkClient.subscribeStateChanges(new IZkStateListener() {@Overridepublic void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {if (state == Watcher.Event.KeeperState.SyncConnected) {//當我重新啟動后start,監聽觸發System.out.println("連接成功");} else if (state == Watcher.Event.KeeperState.Disconnected) {System.out.println("連接斷開");//當我在服務端將zk服務stop時,監聽觸發} elseSystem.out.println("其他狀態" + state);}@Overridepublic void handleNewSession() throws Exception {System.out.println("重建session");}@Overridepublic void handleSessionEstablishmentError(Throwable error) throws Exception {}});}/* @Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {}@Overridepublic void handleDataDeleted(String dataPath) throws Exception {}*/ } public class ZkClientWatcherTest {public static void main(String[] args) throws InterruptedException {ZkClientWatcher zkClientWatche=new ZkClientWatcher();String path="/root";zkClientWatche.deleteRecursive(path);zkClientWatche.createPersistent(path,"hello");zkClientWatche.subscribe(path);zkClientWatche.subscribe2(path);// zkClientWatche.subscribe3(path);//需要啟服務// Thread.sleep(Integer.MAX_VALUE);zkClientWatche.createPersistent(path+"/root2","word");TimeUnit.SECONDS.sleep(1);zkClientWatche.writeData(path,"hi");TimeUnit.SECONDS.sleep(1);//zkClientWatche.delete(path);//如果目錄下有內容 不能刪除 會報 Directory not empty for /root的異常zkClientWatche.deleteRecursive(path);TimeUnit.SECONDS.sleep(1); //這個main線程就結束} } public class ZookeeperUtil {/** zookeeper服務器地址 */ // public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";public static final String connectString = "localhost:2181";/** 定義session失效時間 */public static final int sessionTimeout = 5000;public static final String path = "/root"; }5.3 Curator
5.3.1 簡介
zookeeper不是為高可用性設計的,但它使用ZAB協議達到了極高的一致性。所以它經常被選作注冊中心、配置中心、分布式鎖等場景。
它的性能是非常有限的,而且API并不是那么好用。xjjdog傾向于使用基于Raft協議的Etcd或者Consul,它們更加輕量級一些。
Curator是netflix公司開源的一套zookeeper客戶端,目前是Apache的頂級項目。與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。Curator解決了很多zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊wathcer和NodeExistsException 異常等。
Zookeeper 原生API問題:
1.超時重連,不支持自動,需要手動操作
2.Watch注冊一次后會失效
3.不支持遞歸創建節點
Zookeeper API 升級版 Curator:
1.解決watcher的注冊一次就失效
2.提供更多解決方案并且實現簡單
3.提供常用的ZooKeeper工具類
4.編程風格更爽,點點點就可以了
5.可以遞歸創建節點等
Curator由一系列的模塊構成,對于一般開發者而言,常用的是curator-framework和curator-recipes。
5.3.2 版本問題
Curator2.x.x版本兼容Zookeeper的3.4.x和3.5.x。
Curator3.x.x只兼容Zookeeper 3.5.x,并且提供了一些諸如動態重新配置、watch刪除等新特性。
Curator4 統一對 ZooKeeper 3.4.x 和 3.5.x 的支持
5.3.3 CuratorFramework
Curator-Framework是ZooKeeper Client更高的抽象API,最佳核心的功能就是自動連接管理:
當ZooKeeper客戶端內部出現異常, 將自動進行重連或重試, 該過程對外幾乎完全透明
監控節點數據變化事件NodeDataChanged,需要時調用updateServerList()方法
Curator recipes自動移除監控
更加清晰的API
簡化了ZooKeeper原生的方法, 事件等, 提供流式fluent的接口,提供Recipes實現 : 選舉,共享鎖, 路徑cache, 分布式隊列,分布式優先隊列等。
5.3.4 curator-recipes
curator-recipes:封裝了一些高級特性,如:Cache事件監聽、 Elections選舉、分布式鎖、分布式計數器、分布式Barrier、Queues隊列等
5.3.5 知識點
1.使用curator建立與zk的連接
2.使用curator添加/遞歸添加節點
3.使用curator刪除/遞歸刪除節點
4.使用curator創建/驗證 ACL(訪問權限列表)
5.使用curator監聽 單個/父 節點的變化(watch事件)
6.基于curator實現Zookeeper分布式鎖(需要掌握基本的多線程知識)
7.基于curator實現分布式計數器
5.3.6 Maven依賴
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><!--建議和本地安裝版本保持一致--><version>3.7.0</version> </dependency> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.2.0</version> </dependency> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version> </dependency>5.3.7 api
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class ZkConnectCuratorUtil {final static Logger log = LoggerFactory.getLogger(ZkConnectCuratorUtil.class);public CuratorFramework zkClient = null; //zk的客戶端工具Curator(在本類通過new實例化的是,自動start)private static final int MAX_RETRY_TIMES = 3; //定義失敗重試次數private static final int BASE_SLEEP_TIME_MS = 5000; //連接失敗后,再次重試的間隔時間 單位:毫秒private static final int SESSION_TIME_OUT = 1000000; //會話存活時間,根據業務靈活指定 單位:毫秒private static final String ZK_SERVER_IP_PORT = "localhost:2181";//Zookeeper服務所在的IP和客戶端端口private static final String NAMESPACE = "workspace";//指定后,默認操作的所有的節點都會在該工作空間下進行//本類通過new ZkCuratorUtil()時,自動連通zkClientpublic ZkConnectCuratorUtil() {RetryPolicy retryPolicy = new RetryNTimes(MAX_RETRY_TIMES, BASE_SLEEP_TIME_MS);//首次連接失敗后,重試策略zkClient = CuratorFrameworkFactory.builder()//.authorization("digest", "root:root".getBytes())//登錄超級管理(需單獨配).connectString(ZK_SERVER_IP_PORT).sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy).namespace(NAMESPACE).build();zkClient.start();}public void closeZKClient() {if (zkClient != null) {this.zkClient.close();}}public static void main(String[] args) {ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil();boolean ifStarted=zkUtil.zkClient.isStarted();System.out.println("當前客戶的狀態:" + (ifStarted ? "連接中" : "已關閉"));zkUtil.closeZKClient();boolean ifClose = zkUtil.zkClient.isStarted();System.out.println("當前客戶的狀態:" + (ifClose ? "連接成功" : "已關閉"));} } public class CuratorDao {//使用curator(遞歸)添加節點//級聯創建節點(原生API不支持/后臺客戶端也不支持,但是Curator支持)public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception {zkClient.create().creatingParentContainersIfNeeded()//創建父節點,如果需要的話.withMode(CreateMode.PERSISTENT) //指定節點是臨時的,還是永久的.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定節點的操作權限.forPath(nodePath, nodeData.getBytes());System.out.println(nodePath + "節點已成功創建…");}//使用curator(遞歸)刪除節點//刪除node節點及其子節點public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception {zkClient.delete().guaranteed() //保證刪除:如果刪除失敗,那么在后端還是繼續會刪除,直到成功.deletingChildrenIfNeeded() //級聯刪除子節點//.withVersion(1)//版本號可以據需使用.forPath(nodePath);System.out.println(nodePath + "節點已刪除成功…");}//使用curator更新節點數據//更新節點data數據public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception {zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本號據需使用,默認可以不帶System.out.println(nodePath + "節點數據已修改成功…");}//使用curator查詢節點數據//查詢node節點數據public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = new Stat();byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath);System.out.println("節點" + nodePath + "的數據為" + new String(data));System.out.println("節點的版本號為:" + stat.getVersion());}//使用curator查詢節點的子節點//打印node子節點public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception {List<String> childNodes = zkClient.getChildren().forPath(parentNodePath);System.out.println("開始打印子節點");for (String str : childNodes) {System.out.println(str);}}//使用curator判斷節點是否存在//判斷node節點是否存在public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = zkClient.checkExists().forPath(nodePath);System.out.println(null == stat ? "節點不存在" : "節點存在");}/**************使用Curator高級API特性之Cache緩存監控節點變化*************/@Testpublic void test() throws Exception {ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();CuratorFramework zkClient = zkUtil.zkClient; // CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui"); // CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test"); // CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi"); // CuratorDao.getNodeData(zkClient,"/xiaosi/test"); // CuratorDao.printChildNodes(zkClient, "/xiaosi");CuratorDao.checkNodeExists(zkClient, "/xiaosi");}}5.3.8 使用Curator高級API特性之Cache緩存監控節點變化
cache是一種緩存機制,可以借助cache實現監聽。
簡單來說,cache在客戶端緩存了znode的各種狀態,當感知到zk集群的znode狀態變化,會觸發event事件,注冊的監聽器會處理這些事件。
curator支持的cache種類有4種Path Cache,Node Cache,Tree Cache,Curator Cache
1)Path Cache
Path Cache用來觀察ZNode的子節點并緩存狀態,如果ZNode的子節點被創建,更新或者刪除,那么Path Cache會更新緩存,并且觸發事件給注冊的監聽器。
它是通過PathChildrenCache類來實現的,監聽器注冊是通過PathChildrenCacheListener。
2)Node Cache
Node Cache用來觀察ZNode自身,如果ZNode節點本身被創建,更新或者刪除,那么Node Cache會更新緩存,并觸發事件給注冊的監聽器。
它是通過NodeCache類來實現的,監聽器對應的接口為NodeCacheListener。
3)Tree Cache
Tree Cache是上兩種的合體,Tree Cache觀察的是自身+所有子節點的所有數據,并緩存所有節點數據。
它是通過TreeCache類來實現的,監聽器對應的接口為TreeCacheListener。
4)Curator Cache ( requires ZooKeeper 3.6+)
Curator Cache,是在zk3.6新版本添加的特性,該版本的出現是為了逐步淘汰上面3監聽。
它是通過CuratorCache類來實現的,監聽器對應的接口為CuratorCacheListener。
Curator一次性的watch
import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.WatchedEvent;public class MyCuratorWatcher implements CuratorWatcher {@Overridepublic void process(WatchedEvent event) throws Exception {System.out.println("觸發watcher,節點路徑為:" + event.getPath());switch (event.getType()) {case NodeCreated:break;default:break;}} }//一次性的watchpublic static void watchOnce(CuratorFramework zkClient,String nodePath) throws Exception {zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);}NodeCache監聽當前節點變化,通過NodeCacheListener接口持續監聽節點的變化來實現
//持續監聽的watchpublic static void watchForeverByNodeCache(CuratorFramework zkClient,String nodePath) throws Exception {final NodeCache nodeCache=new NodeCache(zkClient, nodePath);//把監聽節點,轉換為nodeCachenodeCache.start(false);//默認為false 設置為true時,會自動把節點數據存放到nodeCache中;設置為false時,初始化數據為空ChildData cacheData=nodeCache.getCurrentData(); if(null==cacheData) {System.out.println("NodeCache節點的初始化數據為空……");}else {System.out.println("NodeCache節點的初始化數據為"+new String(cacheData.getData()));}//設置循環監聽nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData cdata=nodeCache.getCurrentData();if(null==cdata) {System.out.println("節點發生了變化,可能剛剛被刪除!");nodeCache.close();//關閉監聽}else {String data=new String(cdata.getData());String path=nodeCache.getCurrentData().getPath();System.out.println("節點路徑"+path+"數據發生了變化,最新數據為:"+data);}}});}PathChildrenCache只監聽子節點變化
通過PathChildrenCacheListener接口持續監聽子節點來實現
TreeCache是上兩者的合體,既監聽自身,也監聽所有子節點變化
通過TreeCacheListener接口來實現
Curator Cache,是在zk3.6新版本添加的特性,Curator需5.+
它的出現是為了替換以上3個監聽(NodeCache、PathCache、TreeCache),它通過CuratorCacheListener.builder().for**來選擇對應的監聽。最后再通過curatorCache.listenable().addListener(listener);注冊監聽。
5.4 使用Curator創建/驗證ACL(訪問權限列表)
5.4.1 連通Zk時,就指定登錄權限
//本類代碼,只涉及ACL操作 public class CuratorAcl {public CuratorFramework client = null;public static final String workspace="workspace";public static final String zkServerPath = "192.168.31.216:2181";public CuratorAcl() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().authorization("digest", "mayun:mayun".getBytes())//通常情況下,登錄賬號、密碼可以通過構造參數傳入,暫時固定,據需修改.connectString(zkServerPath).sessionTimeoutMs(20000).retryPolicy(retryPolicy).namespace(workspace).build();client.start();}public void closeZKClient() {if (client != null) {this.client.close();}} }5.4.2寫一個把明文的賬號密碼轉換為加密后的密文的工具類
//把明文的賬號密碼轉換為加密后的密文 public class AclUtils {public static String getDigestUserPwd(String loginId_Username_Passwd) {String digest = "";try {digest = DigestAuthenticationProvider.generateDigest(loginId_Username_Passwd);} catch (NoSuchAlgorithmException e) {e.printStackTrace();}return digest;}public static void main(String[] args) throws IOException, InterruptedException, KeeperException, Exception {String id = "mayun:mayun";String idDigested = getDigestUserPwd(id);System.out.println(idDigested); // mayun:KThXmEntEPZyHsQk7tbP5ZzEevk=} }5.4.3使用自定義工具類AclUtils,一次性給多個用戶賦Acl權限
public static List<ACL> getAcls() throws NoSuchAlgorithmException{List<ACL> acls=new ArrayList<ACL>();Id mayun =new Id("digest", AclUtils.getDigestUserPwd("mayun:mayun"));Id lilei =new Id("digest", AclUtils.getDigestUserPwd("lilei:lilei"));acls.add(new ACL(Perms.ALL, mayun));//給mayun一次性賦值所有權限acls.add(new ACL(Perms.READ, lilei));acls.add(new ACL(Perms.DELETE | Perms.CREATE, lilei));//給lilei分兩次賦權限(目的:看不同的賦權方式)return acls;}5.4.4級聯創建節點,并賦予節點操作權限
public static void createNodesCascade(CuratorAcl cto,String nodePath,String nodeData,List<ACL> acls) throws Exception {String result=cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls, true)//給節點賦權限.forPath(nodePath, nodeData.getBytes());System.out.println("創建成功,result="+result); }5.4.5讀取節點數據
public void getNodeData(CuratorAcl cto,String nodePath) throws Exception {Stat stat = new Stat();byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);if(null!=stat) {System.out.println("節點" + nodePath + "的數據為: " + new String(data));System.out.println("該節點的版本號為: " + stat.getVersion());}}5.4.6修改具有ACL權限節點的data數據
public void modNodeDataWhichWithAcl(CuratorAcl cto,String nodePath,String nodeNewData) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("節點修改后的數據為:"+nodeNewData);cto.client.setData().forPath(nodePath, nodeNewData.getBytes());System.out.println("修改成功");}5.4.7兩種方法判斷node節點是否存(優先使用第一種)
public void checkNodeExists(CuratorAcl cto,String nodePath) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("-----------=================-------------");//判斷節點是否存在,方法一(路徑前面會自動添加workspace)Stat stat=cto.client.checkExists().forPath(nodePath);System.out.println("======="+stat==null?"不存在":"存在");//判斷節點是否存在,方法二(路徑前面需手動添加workspace)Stat stat2 = cto.client.getZookeeperClient().getZooKeeper().exists("/"+workspace+nodePath, false);System.out.println("======="+stat2==null?"不存在":"存在");}ACL權限的main方法測試
通過java代碼給某個節點添加ACL權限后,后臺登陸zk客戶端時,是無法直接操作該節點被ACL控制的權限的操作的,要想操作具有ACL權限的節點,方法只有兩個。
1、知道該節點輸入用戶都有哪些,用這些用戶的賬號密碼登錄
2、使用超級用戶登錄
#getAcl /succ/testDigest 查看都有哪些用戶對該節點有操作權限
#addauth digest succ:succ 登錄
7 分布式鎖
Curator的5種分布式鎖及其對應的核心類:
1.重入式排它鎖 Shared Reentrant Lock,實現類:InterProcessMutex
2.不可重入排它鎖 Shared Lock ,實現類:InterProcessSemaphoreMutex
3.可重入讀寫鎖 Shared Reentrant Read Write Lock,實現類: InterProcessReadWriteLock 、InterProcessLock
4.多鎖對象容器(多共享鎖) Multi Shared Lock,將多個鎖作為單個實體管理的容器,實現類:InterProcessMultiLock、InterProcessLock
5.共享信號鎖Shared Semaphore ,實現類:InterProcessSemaphoreV2
跨 JVM 工作的計數信號量。使用相同鎖路徑的所有 JVM 中的所有進程將實現進程間有限的租用集。此外,這個信號量大多是“公平的”——每個用戶將按照請求的順序獲得租用(從 ZK 的角度來看)。
有兩種模式可用于確定信號量的最大租用。在第一種模式中,最大租用是由給定路徑的用戶維護的約定。在第二種模式中,SharedCountReader 用作給定路徑的信號量的方法,以確定最大租用。
7.1.重入式排它鎖InterProcessMutex
public InterProcessMutex(CuratorFramework client, String path)
獲取/釋放鎖的API
public void acquire() throws Exception;//獲取鎖,獲取不到鎖一直阻塞,zk連接中斷則拋異常
public boolean acquire(long time, TimeUnit unit) throws Exception;//獲取鎖,超過該時間后,直接返回false,zk連接中斷則拋異常
public void release() throws Exception;//釋放鎖
通過release()方法釋放鎖。InterProcessMutex 實例可以重用。Revoking ZooKeeper recipes wiki定義了可協商的撤銷機制。為了撤銷mutex, 調用下面的方法
/**
- 將鎖設為可撤銷的. 當別的進程或線程想讓你釋放鎖時Listener會被調用。
- Parameters:
- listener - the listener
*/
public void makeRevocable(RevocationListener listener)
7.2.不可重入排它鎖InterProcessSemaphoreMutex
public InterProcessSemaphoreMutex(CuratorFramework client, String path)
使用InterProcessSemaphoreMutex,調用方法類似,區別在于該鎖是不可重入的,在同一個線程中不可重入
7.3.可重入讀寫鎖InterProcessReadWriteLock 、InterProcessLock
一個讀寫鎖管理一對相關的鎖。一個負責讀操作,另外一個負責寫操作。讀操作在寫鎖沒被使用時可同時由多個進程使用,而寫鎖使用時不允許讀 (阻塞)。此鎖是可重入的。一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進入寫鎖。這也意味著寫鎖可以降級成讀鎖, 比如請求寫鎖 —>讀鎖 —->釋放寫鎖。從讀鎖升級成寫鎖是不成的。
7.4.多鎖對象容器(多共享鎖) ,將多個鎖作為單個實體管理,InterProcessMultiLock、InterProcessLock
Multi Shared Lock是一個鎖的容器。當調用acquire, 所有的鎖都會被acquire(上鎖),如果請求失敗,所有的鎖都會被release (釋放鎖)。同樣調用release時所有的鎖都被release(失敗被忽略)。基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。主要涉及兩個類:InterProcessMultiLock、InterProcessLock
它的構造函數需要包含的鎖的集合,或者一組ZooKeeper的path。
public InterProcessMultiLock(List locks)
public InterProcessMultiLock(CuratorFramework client, List paths)
7.5.代碼
public class ZkLock {final static Logger log = LoggerFactory.getLogger(ZkLock.class);public CuratorFramework zkClient = null; // zk的客戶端工具Curator(在本類通過new實例化的是,自動start)private static final int BASE_SLEEP_TIME_MS = 1000; // 連接失敗后,再次重試的間隔時間 單位:毫秒private static final int MAX_RETRY_TIMES = 10; // 定義失敗重試次數private static final int SESSION_TIME_OUT = 1000000; // 會話存活時間,根據業務靈活指定 單位:毫秒private static final String ZK_SERVER_IP_PORT = "localhost:2181";// Zookeeper服務所在的IP和客戶端端口private static final String NAMESPACE = "workspace";// 指定后,默認操作的所有的節點都會在該工作空間下進行static int j = 10;//初始化zk客戶端public ZkLock() {// 重試策略:初試時間為1s 重試10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRY_TIMES);// 通過工廠建立連接zkClient = CuratorFrameworkFactory.builder().connectString(ZK_SERVER_IP_PORT) // 連接地址.sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy)// 重試策略.build();zkClient.start();}public static void lockTest(CuratorFramework zkClient) throws InterruptedException {// 使用分布式鎖,所有系統同時監聽同一個節點,達到分布式鎖的目的final InterProcessMutex lock = new InterProcessMutex(zkClient, "/test");final CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 10; i++) {//啟動10個線程new Thread(new Runnable() {@Overridepublic void run() {try {countDownLatch.await();// 線程等待一起執行lock.acquire();// 分布式鎖,數據同步// 處理業務j--;System.out.println(j);} catch (Exception e) {e.printStackTrace();} finally {try {// 釋放鎖lock.release();} catch (Exception e) {e.printStackTrace();}}}}, "t" + i).start();}Thread.sleep(1000);countDownLatch.countDown();// 模擬十個線程一起并發.指定一起執行}public static void main(String[] args) throws InterruptedException {ZkLock zkl = new ZkLock();ZkLock.lockTest(zkl.zkClient);} }8.分布式計數器
利用Zookeeper可以實現一個集群共享的計數器。只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器:DistributedAtomicInteger,DistributedAtomicLong。這個兩個除了計數范圍(int、long)不同外,沒有任何不同。操作也非常簡單,跟AtomicInteger大同小異。
increment() //加1
decrement() //減1
compareAndSet(Integer expectedValue, Integer newValue) //cas操作
get() //獲取當前值
add():增加特定的值
subtract(): 減去特定的值
trySet(): 嘗試設置計數值
使用的時候,必須檢查返回結果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。
另外Curator還有一些高端的用法:分布式屏障—Barrier、Double-barrier,分布式隊列DistributedQueueDistributed Queue
https://blog.csdn.net/succing/article/details/121779721
https://blog.csdn.net/succing/article/details/121793494
https://blog.csdn.net/succing/article/details/121844550
https://blog.csdn.net/succing/article/details/121802687
總結
以上是生活随笔為你收集整理的大数据之Zookeeper的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: deepTools对ChIP-seq数据
- 下一篇: Pascal voc 2012 数据集简