1.ZooKeeper Java客户端的基本使用「第三章 ZooKeeper Java客户端」「架构之路ZooKeeper理论和实战」
相關歷史文章(閱讀本文前,您可能需要先看下之前的系列👇)
國內最全的Spring?Boot系列之四
享元模式:共享女友?-?第355篇
什么是?ZooKeeper?-?第347篇
ZooKeeper安裝?-?第348篇
ZooKeeper數據結構和實操??-?第349篇
ZooKeeper的watch機制?-?第350篇
ZooKeeper的acl權限控制??-?第351篇
ZooKeeper內存數據和持久化??-?第352篇
ZooKeeper集群搭建?-?第354篇
ZooKeeper?Java客戶端的基本使用?-?第356篇
ZooKeeper客戶端Curator?-?第358篇
ZooKeeper客戶端Curator的進階使用?-?第359篇
ZooKeeper客戶端Curator實現Watch事件監聽?-?第361篇
Spring Boot 使用 Curator 操作 ZooKeeper - 第363篇
Spring Boot使用Apache Curator實現服務的注冊和發現 - 第364篇
Spring Boot使用Apache Curator實現分布式鎖(可重入排它鎖) - 第365篇
Spring Boot使用Apache Curator實現leader選舉 - 第366篇
Spring Boot使用Apache Curator實現分布式計數器 - 367篇
ZooKeeper Session 基本原理 - 第369篇
ZooKeeper分桶策略實現高性能的會話管理 - 第371篇
ZooKeeper集群架構以及讀寫原理 - 第372篇
這節我們來看看通過java代碼怎么和ZK進行交互,這里主要使用apache提供的org.apache.zookeeper來進行和zk server進行連接的。
一、client : zookeeper
1.1 說明
?????? 現在我們要使用代碼的方式進行連接到zk server,那么我們一個zookeeper java客戶端的依賴包,那這里我們使用的就是apache提供的zookeeper.jar。
?????? 在zookeeper提供了和zk server交互的核心類就是ZooKeeper,接下來我們看下的步驟:
(1)創建項目;
(2)在pom.xml文件中添加zookeeper的依賴包;
(3)編寫一個測試類,實現創建一個節點和獲取一個節點;
1.2 開發環境
(1)操作系統:Mac OS;
(2)ZK Server : 3.6.2;
(3)開發工具:idea;
(4)JDK:1.8
1.3 hello小栗子
1.3.1 創建一個項目
?????? 使用idea創建一個maven project,取名為:zookeeper-java。
1.3.2 添加依賴
?????? 在pom.xml文件中添加依賴,zookeeper(zk client包)和junit(單元測試包):
<!-- zookeeper java客戶端,選擇的版本號最好是和zk的服務端是相同的版本,避免引發奇奇怪怪的問題 --> <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.2</version> </dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope> </dependency>說明:zookeeper的版本最好是和服務端的版本是一樣的。
1.3.3 編寫例子
(1)和zk server建立連接:
String connectString = "127.0.0.1:2181"; int timeout = 4000;/** 連接過程是異步的* 底層是使用守護線程,當沒有業務線程執行的話,那么守護線程也就結束了。* 這里也就是main線程結束守護線程也就結束了。*/ ZooKeeper zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {public void process(WatchedEvent event) {if(event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){System.out.println("連接已經建立");}} });說明:
① 連接過程是異步的;
② 底層是使用守護線程,當沒有業務線程執行的話,那么守護線程也就結束了。這里也就是main線程結束守護線程也就結束了。
③ 如何避免main線程不結束呢?使用線程的sleep,休眠一下。
?????? 集群的情況下,多個地址使用逗號分隔:
String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";(2)創建節點
?????? 有了zooKeeper對象之后,就可以使用這個對象的create方法創建節點了:
String path = "/myconfig"; byte[] bytes = new String("hello").getBytes(); //節點不能多次創建,多次創建會報錯: KeeperErrorCode = NodeExists for /myconfig String rs = zooKeeper.create(path,bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //result:/myconfig System.out.println("result:"+rs);說明:
① 創建節點調用的方法是:zooKeeper.create(),第一個參數是節點的path;第二個參數是byte[] data;第三個參數是:ACl權限信息;第四個參數是:節點的類型;
② 節點不能多次創建,否則會報錯:KeeperErrorCode = NodeExists for /myconfig。
(3)獲取節點的數據:
?????? 獲取節點的數據是getData,這里有一個Watcher可以監聽到數據的變化:
//通過create創建數據,通過get獲取數據 //這種方式只能監聽一次 byte[] dataBytes = zooKeeper.getData(path,new Watcher(){public void process(WatchedEvent event) {if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){System.out.println("數據改變了:"+event.getPath());}} },null); System.out.println("獲取到的數據是:"+ new String(dataBytes));整個類的代碼如下:
package com.kfit;import org.apache.zookeeper.*; import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;/*** TODO** @author 悟纖「公眾號SpringBoot」* @date 2021-03-16* @slogan 大道至簡 悟在天成*/ public class ZkJavaClientDemo {public static void main(String[] args) throws Exception {String connectString = "127.0.0.1:2181";int timeout = 4000;/** 連接過程是異步的* 底層是使用守護線程,當沒有業務線程執行的話,那么守護線程也就結束了。* 這里也就是main線程結束守護線程也就結束了。*/ZooKeeper zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {public void process(WatchedEvent event) {if(event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){System.out.println("連接已經建立");}}});System.out.println("開始創建節點...");String path = "/myconfig";byte[] bytes = new String("hello").getBytes();//節點不能多次創建,多次創建會報錯: KeeperErrorCode = NodeExists for /myconfigString rs = zooKeeper.create(path,bytes, OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//result:/myconfigSystem.out.println("result:"+rs);//通過create創建數據,通過get獲取數據//這種方式只能監聽一次byte[] dataBytes = zooKeeper.getData(path,new Watcher(){public void process(WatchedEvent event) {if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){System.out.println("數據改變了:"+event.getPath());}}},null);System.out.println("獲取到的數據是:"+ new String(dataBytes));/** 通過sleep 避免主線程結束*/Thread.sleep(Integer.MAX_VALUE);System.out.println("main end.");}}?????? 運行main方法:
?????? 這時候我們通過zkCli.sh進行訪問zk server,然后修改節點數據/myconfig:
set /myconfig hello1
?
1.3.4 永久監聽
我們發現現在我們的編碼只能監聽一次,如何實現永久監聽呢?很簡單,只要在監聽的代碼里再次監聽就可以了,很簡單:
Watcher watcher = new Watcher(){public void process(WatchedEvent event) {if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){System.out.println("數據改變了:"+event.getPath());try {byte[] dataBytes = zooKeeper.getData(path,this,null);System.out.println("獲取到的數據是:"+ new String(dataBytes));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}} }; byte[] dataBytes = zooKeeper.getData(path,watcher,null); System.out.println("獲取到的數據是:"+ new String(dataBytes));?????? 多次執行set 都是能夠監聽到的。
1.3.5 永久監聽方式二
?????? 上面的方式不管是之前的版本還是最新的版本都是通用的,如果是3.6.x版本的是支持永久遞歸監聽的,那么怎么玩吶?也很簡單:
zooKeeper.addWatch(path, event -> {System.out.println("獲取到的數據是:"+ event); },AddWatchMode.PERSISTENT);event這里使用了Lambda表達式,非Lambda的寫法是:
zooKeeper.addWatch(path, new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("獲取到的數據是:"+ event);} }, AddWatchMode.PERSISTENT);?????? 這里的AddWatchMode可以是PERSISTENT和PERSISTENT_RECURSIVE。
二、進階的使用
2.1 修改節點
?????? 修改節點使用的是set的操作,我們看下簡單的例子:
//path = "/myconfig";byte[] bytes = new String("hello-update").getBytes(); Stat stat = zooKeeper.setData(path,bytes,-1);說明:
(1)執行set的時候,如果node不存在,會拋出異常:KeeperErrorCode = NoNode for /myconfig。在執行zooKeeper.setData()就會拋出異常,就不會有返回值Stat了
(2)這里的第三個參數是version:
?????? ① 如果不考慮并發修改的問題的話,那么version=-1;
??? ② 如果填寫的version和我們節點的version對不上的話,那么執行setData會報錯:KeeperErrorCode = BadVersion for /myconfig。
2.2 并發修改節點
?????? 對于節點的修改,會出現多個線程進行并發修改的問題,那么我們控制并發節點的修改問題吶,很簡單,在前面的例子中的第三個參數version就是用來解決節點的并發修改問題的。具體的一個修改思路:
(1)、通過getData獲取到節點的版本信息;
(2)、在執行setData的時候,傳遞當前獲取到的版本號;
?????? 具體的代碼如下:
Stat nodeStat = new Stat(); zooKeeper.getData(path, false,nodeStat);byte[] dataBytes = new String("hello-update").getBytes(); zooKeeper.setData(path,dataBytes,nodeStat.getVersion());?
2.3 刪除節點
?????? 刪除節點很簡單,我們直接來看下代碼:
zooKeeper.delete(path, -1);版本號的說明:-1 代表匹配所有版本號,直接刪除。任意大于-1的代表可以指定數據版本刪除。
2.4 異步獲取數據
?????? 我們之前getData的方式是同步的,那么如何異步獲取呢?對于zookeeper也提供了相應的方式:
@Overridepublic void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {System.out.println(Thread.currentThread().getName());//main-EventThreadSystem.out.println("ctx="+ctx);//ctx=1000System.out.println("data="+new String(data));//data=hello} }, "1000");三、小結
(1)使用java client的核心思路:引jar包,使用zookeeper類連接上ZK Server,然后就可以調用create/get/set/delete等操作節點的方法。
(2)對于節點的監聽watcher操作,只監聽一次,可以使用循環監聽的方式進行永久監聽;在3.6.x的版本可以使用addWatch方法永久遞歸監聽。
(3)不能遞歸創建和刪除節點。
越分享越成長,越分享越自信。
通過本文,你最大的是收獲是什么?請在留言區分享你的收獲。
我就是我,是顏色不一樣的煙火。 我就是我,是與眾不同的小蘋果。à悟空學院:https://t.cn/Rg3fKJD
學院中有Spring?Boot相關的課程!!
SpringBoot視頻:http://t.cn/A6ZagYTi
SpringBoot交流平臺:https://t.cn/R3QDhU0
SpringSecurity5.0視頻:http://t.cn/A6ZadMBe
ShardingJDBC分庫分表:http://t.cn/A6ZarrqS
分布式事務解決方案:http://t.cn/A6ZaBnIr
JVM內存模型調優實戰:http://t.cn/A6wWMVqG
Spring入門到精通:https://t.cn/A6bFcDh4
大話設計模式之愛你:https://dwz.cn/wqO0MAy7
總結
以上是生活随笔為你收集整理的1.ZooKeeper Java客户端的基本使用「第三章 ZooKeeper Java客户端」「架构之路ZooKeeper理论和实战」的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TCP相关的面试内容整理
- 下一篇: java游戏运行环境_Java运行环境