3.ZooKeeper客户端Curator的进阶使用「第三章 ZooKeeper Java客户端」「架构之路ZooKeeper理论和实战」
相關歷史文章(閱讀本文前,您可能需要先看下之前的系列👇)
國內最全的Spring?Boot系列之四
享元模式:共享女友?-?第355篇
什么是?ZooKeeper?-?第347篇
ZooKeeper安裝?-?第348篇
ZooKeeper數據結構和實操??-?第349篇
ZooKeeper的watch機制?-?第350篇
ZooKeeper的acl權限控制??-?第351篇
ZooKeeper內存數據和持久化??-?第352篇
ZooKeeper集群搭建?-?第354篇
ZooKeeper?Java客戶端的基本使用?-?第356篇
ZooKeeper客戶端Curator?-?第358篇
ZooKeeper客戶端Curator的進階使用?-?第359篇
ZooKeeper客戶端Curator實現Watch事件監聽?-?第361篇
Spring Boot 使用 Curator 操作 ZooKeeper - 第363篇
Spring Boot使用Apache Curator實現服務的注冊和發現 - 第364篇
Spring Boot使用Apache Curator實現分布式鎖(可重入排它鎖) - 第365篇
Spring Boot使用Apache Curator實現leader選舉 - 第366篇
Spring Boot使用Apache Curator實現分布式計數器 - 367篇
ZooKeeper Session 基本原理 - 第369篇
ZooKeeper分桶策略實現高性能的會話管理 - 第371篇
ZooKeeper集群架構以及讀寫原理 - 第372篇
在前面我們對于Curator有了一個基本的了解和使用,這一節我們看一下更深入的知識。
一、Curator進階使用
1.1 Curator構建CuratorFramework推薦方式
?????? 在前面我們說到了,第二種方式會比較好,直接上源碼:
/*方式二:* connectionString zk地址* sessionTimeoutMs 會話超時時間* connectionTimeoutMs 連接超時時間* namespace 每個curatorFramework 可以設置一個獨立的命名空間,之后操作都是基于該命名空間,比如操作 /user/message 其實操作的是/curator/user/message* retryPolicy 重試策略*/ String connectString = "127.0.0.1:2181"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(60000).connectionTimeoutMs(15000).namespace("curator").retryPolicy(retryPolicy).build(); curatorFramework.start();1.2 連接成功的監聽
?????? 我們創建連接是否成功可以通過方法進行監聽:
curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {if(newState == ConnectionState.CONNECTED){System.out.println("連接成功!");}} });?????? 添加監聽的代碼需要放到curatorFramework.start();代碼之前。
1.3 創建節點
?????? 上面使用create進行節點的創建,但是如果節點存在的話,也是會報錯的:
創建一個 允許所有人訪問的 持久節點由于使用了namespace,所以最終創建的節點是:/curator/test2*/ curatorFramework.create().creatingParentsIfNeeded()//遞歸創建,如果沒有父節點,自動創建父節點.withMode(CreateMode.PERSISTENT)//節點類型,持久節點.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//設置ACL,和原生API相同.forPath("/test2", "hello".getBytes());說明:由于使用了namespace,所以最終創建的節點是:/curator/test2
?
?????? 假設我們多次創建節點的話,是會保存的,那么怎么辦吶?
?????? 可以先判斷節點是否存在,在決定是否進行創建:
/*創建一個 允許所有人訪問的 持久節點由于使用了namespace,所以最終創建的節點是:/curator/test2*/ String path = "/test3"; Stat stat = curatorFramework.checkExists().forPath(path); if(stat == null){curatorFramework.create().creatingParentsIfNeeded()//遞歸創建,如果沒有父節點,自動創建父節點.withMode(CreateMode.PERSISTENT)//節點類型,持久節點.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//設置ACL,和原生API相同.forPath(path, "hello".getBytes()); }?????? protection模式,防止由于異常原因,導致僵尸節點:
curatorFramework.create().creatingParentsIfNeeded()//遞歸創建,如果沒有父節點,自動創建父節點.withProtection()//protection模式,防止由于異常原因,導致僵尸節點。.withMode(CreateMode.PERSISTENT_SEQUENTIAL)//節點類型,持久節點.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//設置ACL,和原生API相同.forPath("/test4/id-", "hello".getBytes());?????? 當我們創建節點是持久有序節點的時候,當我們提供請求給ZK Server,ZK Server創建成功了之后會給客戶端進行反饋,但在反饋的過程中,由于網絡原因導致客戶端沒有收到反饋結果,由于curator有重試的機制,會造次進行發起請求進行創建,那么就會出現之前的節點沒有被使用的情況,也就是所謂的僵尸節點。如果我們配置了withProtection()的話,那么就能夠解決僵尸節點的問題。
?????? 如果客戶端所連接的服務器崩潰了,但還沒來得及返回客戶端所創建的有序節點的節點名稱(即節點序列號),或者客戶端只是連接丟失,客戶端沒接收到所請求操作的響應信息,結果,客戶端并不知道所創建的znode節點路徑名稱。回憶對于有序節點的應用場景,例如,建立一個有序的所有客戶端列表。為了解決這個問題,CreateBuilder提供了一個 withProtection 方法來通知Curator客戶端,在創建的有序節點前添加一個唯一標識符,如果create操作失敗了,客戶端就會開始重試操作,而重試操作的一個步驟就是驗證是否存在一個節點包含這個唯一標識符。
?????? 那么具體是什么原理呢?
?????? 我看下底層的代碼CreateBuilderImpl:
?????? 跟進setProtectdMode方法:
?????? 說白了調用withProtection方法就是初始化了一個protectedId。那這個家伙又是怎么使用的吶?
?????? 我們看下生成的數據:
?????? 這個protectedId被當做了節點path的前綴了。
?????? 通過這里我們就可以分析出來,底層應該是每次都會把protectedId進行拼湊成一個新的path,重試操作的一個步驟就是驗證是否存在一個節點包含這個唯一標識符。。
1.4 異步獲取節點數據
?????? 獲取節點可以使用get進行獲取,如果是異步獲取的,使用inBackground:
curatorFramework.getData().inBackground(new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println(event.getPath()+":"+new String(event.getData()));} }).forPath("/test3");?????? 對于inBackground還可以指定我們自己的線程池對象:
public T inBackground(BackgroundCallback callback, Executor executor);?????? 這個大家可以去嘗試一下。
1.5 修改節點
?????? 修改節點就很簡單了:
curatorFramework.setData().forPath("/test3", "hello-update".getBytes());??????? 根據版本進行修改的話:
Stat stat = curatorFramework.checkExists().forPath("/test3"); curatorFramework.setData().withVersion(stat.getVersion()).forPath("/test3", "hello-update1".getBytes());?
1.6 刪除節點
?????? 刪除節點使用delete().forPath():
/*會刪除節點:/curator/test4以及/curator/test4下的子節點*/ curatorFramework.delete().deletingChildrenIfNeeded().forPath("/test4");?
我就是我,是顏色不一樣的煙火。 我就是我,是與眾不同的小蘋果。à悟空學院:https://t.cn/Rg3fKJD
學院中有Spring?Boot相關的課程!!
SpringBoot視頻:http://t.cn/A6ZagYTi
SpringBoot交流平臺:https://t.cn/R3QDhU0
SpringSecurity5.0視頻:http://t.cn/A6ZadMBe
ShardingJDBC分庫分表:http://t.cn/A6ZarrqS
分布式事務解決方案:http://t.cn/A6ZaBnIr
JVM內存模型調優實戰:http://t.cn/A6wWMVqG
Spring入門到精通:https://t.cn/A6bFcDh4
大話設計模式之愛你:https://dwz.cn/wqO0MAy7
總結
以上是生活随笔為你收集整理的3.ZooKeeper客户端Curator的进阶使用「第三章 ZooKeeper Java客户端」「架构之路ZooKeeper理论和实战」的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 线程安全和可重入函数的联系与区别
- 下一篇: java事件类_关于Java事件类的一些