日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache ZooKeeper - 使用原生的API操作ZK

發布時間:2025/3/21 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache ZooKeeper - 使用原生的API操作ZK 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 概述
  • maven依賴
  • 驗證
  • 測試基類
  • ZK構造函數參數
    • connectString:ZooKeeper服務器列表
    • sessionTimeout:會話的超時時間, “毫秒”為單位
    • watcher:事件通知處理器
    • canBeReadOnly: 用于標識當前會話是否支持“read-only(只讀)”模式。
    • sessionId和 sessionPasswd:會話ID和會話秘鑰
  • CRUD
    • 同步創建節點
    • 修改數據
    • 查詢數據(不帶watcher)
    • 查詢數據(帶watcher)
    • 刪除數據
    • 異步創建節點

概述

前面幾篇系列博文我們熟悉了如何通過命令來操作ZK節點數據,下面我們來看下如何使用API來操作

主要兩種方式

  • 原生API
  • Curator
  • 今天我們來看下如何使用原生的API操作ZK


    maven依賴

    和 服務端的版本保持一致

    <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.8</version></dependency>

    驗證

    接下來我們使用單元測試來驗證下原生API的對ZK 數據的增刪改查

    測試基類

    我們來寫下測試基類

    package com.artisan.zk.originalClient;import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before;import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;@Slf4j public abstract class StandAloneBaseTest {private static final String ZK_ADDRESS = "192.168.126.131:2181";private static final int SESSION_TIMEOUT = 30_000;private static CountDownLatch countDownLatch = new CountDownLatch(1);public static ZooKeeper getZooKeeper() {return zooKeeper;}private static ZooKeeper zooKeeper ;private static Watcher watcher = event -> {if (event.getState() == Watcher.Event.KeeperState.SyncConnected && event.getType() == Watcher.Event.EventType.None){log.info("ZK Connected");countDownLatch.countDown();}};@Beforepublic void init() throws IOException, InterruptedException {log.info("start to connect zk server: {}" , ZK_ADDRESS);zooKeeper = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, watcher);log.info("connecting to....{}", ZK_ADDRESS);countDownLatch.await();}@Afterpublic void test(){try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}}

    為了方便測試,直接在init初始化方法中創建zookeeper實例 ,不要關閉~


    ZK構造函數參數

    connectString:ZooKeeper服務器列表

    由英文逗號分開的host:port字符串組成,每一個都代表一臺ZooKeeper機器,如 host1:port1,host2:port2,host3:port3

    另外,也可以在connectString中設置客戶端連接上ZooKeeper后的根目錄,方法是在host:port字符串之后添加上這個根目錄。
    例如,host1:port1,host2:port2,host3:port3/app/a,這樣就指定了該客戶端連接上ZooKeeper服務器之后,所有對ZooKeeper的操作,都會基于這個根目錄。
    例如,客戶端對/foo/bar 的操作,最終創建/app/a/foo/bar, 這個目錄也叫Chroot,即客戶端隔離命名空間。


    sessionTimeout:會話的超時時間, “毫秒”為單位

    在ZooKeeper中有會話的概念,在一個會話周期內,ZooKeeper客戶端和服務器之間會通過心跳檢測機制來維持會話的有效性.

    一旦在sessionTimeout時間內沒有進行有效的心跳檢測,會話就會失效。


    watcher:事件通知處理器

    ZooKeeper允許客戶端在構造方法中傳入一個接口 watcher (org.apache. zookeeper.Watcher)的實現類對象來作為默認的 Watcher事件通知處理器。

    當然,該參數可以設置為null 以表明不需要設置默認的 Watcher處理器。


    canBeReadOnly: 用于標識當前會話是否支持“read-only(只讀)”模式。

    boolean類型的參數
    默認情況下,在ZooKeeper集群中,一個機器如果和集群中過半及以上機器失去了網絡連接,那么這個機器將不再處理客戶端請求(包括讀寫請
    求)。

    但是在某些使用場景下,當ZooKeeper服務器發生此類故障的時候,我們還是希望ZooKeeper服務器能夠提供讀服務(當然寫服務肯定無法提供),這就是 ZooKeeper的“read-only”模式。


    sessionId和 sessionPasswd:會話ID和會話秘鑰

    這兩個參數能夠唯一確定一個會話,同時客戶端使用這兩個參數可以實現客戶端會話復用,從而達到恢復會話的效果。
    具體使用方法是,第一次連接上ZooKeeper服務器時,通過調用ZooKeeper對象實例的以下兩個接口,即可獲得當前會話的ID和秘鑰:

    long getSessionId(); byte[]getSessionPasswd( );

    荻取到這兩個參數值之后,就可以在下次創建ZooKeeper對象實例的時候傳入構造方法了


    CRUD

    同步創建節點

    package com.artisan.zk.originalClient;import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Test;@Slf4j public class BaseOperationStandAloneModeTest extends StandAloneBaseTest{private static final String NODE_NAME = "/artisan-node";@Testpublic void testCreate(){try{ZooKeeper zooKeeper = getZooKeeper();String s = zooKeeper.create(NODE_NAME,"artisan-node-value".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("create persistent node {} , result {}" , NODE_NAME, s );}catch (Exception e){log.error("create Exception {}", e.getMessage());}} }


    修改數據

    @SneakyThrows@Testpublic void testSetData() {// 修改前數據Stat stat = new Stat();byte[] data = getZooKeeper().getData(NODE_NAME, null, stat);log.info("data before change: " + new String(data));int version = stat.getVersion();log.info("data version {} " , version);// 修改數據Stat newStat = getZooKeeper().setData(NODE_NAME, "ARTISAN - NEW-SET-DATA".getBytes(), version);log.info("new stat version info {} " , newStat.getVersion());log.info("data after change: {} " , new String(getZooKeeper().getData(NODE_NAME, null, newStat)));}



    查詢數據(不帶watcher)

    @SneakyThrows@Testpublic void testGetWithOutWatch(){byte[] data = getZooKeeper().getData(NODE_NAME, null, null);log.info("data {}" , new String(data));}


    查詢數據(帶watcher)

    @SneakyThrows@Testpublic void testGetWithWatch(){Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {// 監聽NodeDataChanged事件if (event.getPath() != null && event.getPath().equals(NODE_NAME)&& event.getType() == Watcher.Event.EventType.NodeDataChanged){log.info("path {} changed watched " , NODE_NAME);// 監聽一旦觸發就會失效,因此需要重新監聽try {byte[] data = getZooKeeper().getData(NODE_NAME, this, null);log.info("監聽觸發后的操作-- data: {}",new String(data));} catch (Exception e) {log.info("getData Error {} " , e.getMessage());}}}};// 獲取節點數據byte[] data = getZooKeeper().getData(NODE_NAME, watcher, null);log.info("data {}" , new String(data));}

    因為監聽的是NodeDataChanged事件,因此我們再去調用修改數據的方法,或者在客戶端手動修改數據

    觀察testGetWithWatch的日志

    zk里查看數據


    刪除數據

    @SneakyThrows@Testpublic void testDelete(){// if the given version is -1, it matches any node's versions// -1 代表匹配所有版本,直接刪除// 任意大于 -1 的代表可以指定數據版本刪除getZooKeeper().delete(NODE_NAME,-1);}

    查看客戶端,已經刪除


    異步創建節點

    @SneakyThrows@Testpublic void testCreateAsyn(){getZooKeeper().create(NODE_NAME, "DATA_VALUE".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,(rc, path, ctx, name) -> {String currentThreadName = Thread.currentThread().getName();log.info("currentThreadName {} , rc {} , path {} , ctx {} , name {} " , currentThreadName, rc , path ,ctx ,name );}, "ARTISAN");byte[] data = getZooKeeper().getData(NODE_NAME, null, null);log.info("data {}" , new String(data));}

    EventThread創建的節點 ,而非當前線程


    行了 基本操作就這些,下篇繼續

    總結

    以上是生活随笔為你收集整理的Apache ZooKeeper - 使用原生的API操作ZK的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。