[Curator] Path Cache 的使用与分析
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
Path Cache
Path Cache其實(shí)就是用于對(duì)zk節(jié)點(diǎn)的監(jiān)聽。不論是子節(jié)點(diǎn)的新增、更新或者移除的時(shí)候,Path Cache都能對(duì)子節(jié)點(diǎn)集合的狀態(tài)和數(shù)據(jù)變化做出響應(yīng)。
1. 關(guān)鍵 API
org.apache.curator.framework.recipes.cache.PathChildrenCache
org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
org.apache.curator.framework.recipes.cache.ChildData
2. 機(jī)制說明
PathChildrenCache內(nèi)部使用一個(gè)命令模式來封裝各種操作:
- 操作接口:org.apache.curator.framework.recipes.cache.Operation
- 刷新操作:org.apache.curator.framework.recipes.cache.RefreshOperation
- 觸發(fā)事件操作:org.apache.curator.framework.recipes.cache.EventOperation
- 獲取數(shù)據(jù)操作:org.apache.curator.framework.recipes.cache.GetDataOperation
而這些操作對(duì)象,都在構(gòu)造器中接受PathChildrenCache引用,這樣可以在操作中,處理cache(回調(diào)):
EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) {this.cache = cache;this.event = event; } GetDataOperation(PathChildrenCache cache, String fullPath) {this.cache = cache;this.fullPath = PathUtils.validatePath(fullPath); } RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode) {this.cache = cache;this.mode = mode; }而這些操作,還使用了一個(gè)單線程的線程池來調(diào)用,從而形成了異步調(diào)用。
- 使用了一個(gè)private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());來作為線程池的任務(wù)接收隊(duì)列
- 使用set,避免了并發(fā)情況下重復(fù)操作
- 由于單線程,使得各種操作都是按序執(zhí)行的
- 所以為了避免curator的監(jiān)聽機(jī)制阻塞
- 在childrenWatcher以及dataWatcher中,都使用異步執(zhí)行命令的方式
觸發(fā)操作:
void offerOperation(final Operation operation) {if ( operationsQuantizer.add(operation) ){submitToExecutor(new Runnable(){@Overridepublic void run(){try{operationsQuantizer.remove(operation);operation.invoke();}catch ( InterruptedException e ){//We expect to get interrupted during shutdown,//so just ignore these eventsif ( state.get() != State.CLOSED ){handleException(e);}Thread.currentThread().interrupt();}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}}});} } private synchronized void submitToExecutor(final Runnable command) {if ( state.get() == State.STARTED ){executorService.submit(command);} }- 考慮到了各種操作的中斷
- 考慮到了狀態(tài)
- 統(tǒng)一操作的異常處理
- 投遞方法submitToExecutor使用了synchronized
- 因?yàn)榭赡鼙O(jiān)聽器觸發(fā),所以需要對(duì)狀態(tài)進(jìn)行檢查
- 如先關(guān)閉,然后再被某個(gè)監(jiān)聽器回掉,導(dǎo)致不必要的操作
- 而檢查動(dòng)作不是原子的,所以需要同步鎖
- 因?yàn)榭赡鼙O(jiān)聽器觸發(fā),所以需要對(duì)狀態(tài)進(jìn)行檢查
3. 用法
3.1 創(chuàng)建
public PathChildrenCache(CuratorFramework client,String path,boolean cacheData)- cacheData
- 如果設(shè)置true,是否需要緩存數(shù)據(jù)
3.2 使用
- Cache必須在使用前調(diào)用start()方法
- 有兩個(gè)start()方法
- void start()
- 無參
- void start(PathChildrenCache.StartMode mode)
- 可以通過參數(shù),選擇如何初始化
- StartMode
- NORMAL
- BUILD_INITIAL_CACHE
- POST_INITIALIZED_EVENT
- public void addListener(PathChildrenCacheListener listener)
4. 錯(cuò)誤處理
PathChildrenCache實(shí)例會(huì)通過ConnectionStateListener監(jiān)聽鏈接狀態(tài)。 如果鏈接狀態(tài)發(fā)生變化,緩存會(huì)被重置(PathChildrenCacheListener會(huì)受到一個(gè)RESET事件)
5. 源碼分析
5.1 類定義
public class PathChildrenCache implements Closeable{}- 實(shí)現(xiàn)了java.io.Closeable接口
5.2 成員變量
public class PathChildrenCache implements Closeable {private final Logger log = LoggerFactory.getLogger(getClass());private final CuratorFramework client;private final String path;private final CloseableExecutorService executorService;private final boolean cacheData;private final boolean dataIsCompressed;private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);private final EnsureContainers ensureContainers;private enum State{LATENT,STARTED,CLOSED}private static final ChildData NULL_CHILD_DATA = new ChildData("/", null, null);private static final boolean USE_EXISTS = Boolean.getBoolean("curator-path-children-cache-use-exists");private volatile Watcher childrenWatcher = new Watcher(){@Overridepublic void process(WatchedEvent event){offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));}};private volatile Watcher dataWatcher = new Watcher(){@Overridepublic void process(WatchedEvent event){try{if ( event.getType() == Event.EventType.NodeDeleted ){remove(event.getPath());}else if ( event.getType() == Event.EventType.NodeDataChanged ){offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}}};@VisibleForTestingvolatile Exchanger<Object> rebuildTestExchanger;private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener(){@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){handleStateChange(newState);}};private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache"); }- log
- client
- path
- 緩存對(duì)應(yīng)的zk節(jié)點(diǎn)路徑
- executorService
- org.apache.curator.utils.CloseableExecutorService
- 線程池
- 用以執(zhí)行各種操作
- 參見第2章節(jié)
- cacheData
- 是否需要緩存數(shù)據(jù)
- dataIsCompressed
- 數(shù)據(jù)是否已壓縮
- listeners
- org.apache.curator.framework.listen.ListenerContainer
- 監(jiān)聽器容器(管理多個(gè)監(jiān)聽器)
- 業(yè)務(wù)監(jiān)聽器
- 可以添加自己的監(jiān)聽器
- currentData
- java.util.concurrent.ConcurrentMap
- 當(dāng)前數(shù)據(jù)
- <String, ChildData>
- 存放著多個(gè)org.apache.curator.framework.recipes.cache.ChildData
- initialSet
- AtomicReference
- 初始化集合
- 放置節(jié)點(diǎn),以此來跟蹤各個(gè)節(jié)點(diǎn)是否初始化
- 如果全部節(jié)點(diǎn)都初始化完成,則會(huì)觸發(fā)PathChildrenCacheEvent.Type.INITIALIZED事件
- operationsQuantizer
- 相當(dāng)于線程池的任務(wù)接收隊(duì)列
- state
- 狀態(tài)
- AtomicReference
- ensureContainers
- org.apache.curator.framework.EnsureContainers
- 可以線程安全的創(chuàng)建path節(jié)點(diǎn)
- State
- 內(nèi)部枚舉
- LATENT
- STARTED
- CLOSED
- 內(nèi)部枚舉
- NULL_CHILD_DATA
- 私有常量
- 空數(shù)據(jù)節(jié)點(diǎn)
- USE_EXISTS
- 私有常量
- 使用系統(tǒng)配置中curator-path-children-cache-use-exists的值
- childrenWatcher
- volatile
- 子節(jié)點(diǎn)變動(dòng)的監(jiān)聽器
- dataWatcher
- volatile
- 數(shù)據(jù)變動(dòng)監(jiān)聽器
- rebuildTestExchanger
- java.util.concurrent.Exchanger
- 用于并發(fā)線程間傳值
- 在重建緩存時(shí)通過此對(duì)象傳遞一個(gè)信號(hào)對(duì)象
- 用于測(cè)試
- connectionStateListener
- 鏈接狀態(tài)監(jiān)聽器
- defaultThreadFactory
- 線程工廠
5.3 構(gòu)造器
public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode) {this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory) {this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) {this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) {this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) {this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService) {this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) {this.client = client;this.path = PathUtils.validatePath(path);this.cacheData = cacheData;this.dataIsCompressed = dataIsCompressed;this.executorService = executorService;ensureContainers = new EnsureContainers(client, path); }有7個(gè)構(gòu)造器,最終都是調(diào)用最后一個(gè)。不過從中也可以看出:
- 默認(rèn)使用newSingleThreadExecutor單線程線程池
- 默認(rèn)不對(duì)數(shù)據(jù)進(jìn)行壓縮處理
5.4 啟動(dòng)
緩存在使用前需要調(diào)用start()
public enum StartMode{NORMAL,BUILD_INITIAL_CACHE,POST_INITIALIZED_EVENT}public void start() throws Exception {start(StartMode.NORMAL); }@Deprecated public void start(boolean buildInitial) throws Exception {start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL); }public void start(StartMode mode) throws Exception {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");mode = Preconditions.checkNotNull(mode, "mode cannot be null");client.getConnectionStateListenable().addListener(connectionStateListener);switch ( mode ){case NORMAL:{offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));break;}case BUILD_INITIAL_CACHE:{rebuild();break;}case POST_INITIALIZED_EVENT:{initialSet.set(Maps.<String, ChildData>newConcurrentMap());offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));break;}} }private void processChildren(List<String> children, RefreshMode mode) throws Exception {Set<String> removedNodes = Sets.newHashSet(currentData.keySet());for ( String child : children ) {removedNodes.remove(ZKPaths.makePath(path, child));}for ( String fullPath : removedNodes ){remove(fullPath);}for ( String name : children ){String fullPath = ZKPaths.makePath(path, name);if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) ){getDataAndStat(fullPath);}updateInitialSet(name, NULL_CHILD_DATA);}maybeOfferInitializedEvent(initialSet.get()); }- 無參的start()
- 默認(rèn)使用StartMode.NORMAL策略
- 不建議使用的start(boolean buildInitial)
- true
- 使用StartMode.BUILD_INITIAL_CACHE策略
- false
- 使用StartMode.NORMAL策略
- true
- 啟動(dòng)時(shí)添加了鏈接狀態(tài)的監(jiān)聽器
可以看到啟動(dòng)過程有三種策略:
- 使用RefreshMode.STANDARD刷新模式
- 調(diào)用org.apache.curator.framework.recipes.cache.PathChildrenCache#refresh方法
- 調(diào)用org.apache.curator.framework.EnsureContainers#ensure創(chuàng)建節(jié)點(diǎn)
- 在節(jié)點(diǎn)上添加childrenWatcher監(jiān)聽器
- 回調(diào)觸發(fā)org.apache.curator.framework.recipes.cache.PathChildrenCache#processChildren進(jìn)行刷新
- 清理掉已緩存在本地的數(shù)據(jù)中的其他節(jié)點(diǎn)
- 篩選出不是本cache的數(shù)據(jù)節(jié)點(diǎn)
- 從本地初始集合中清理掉
- PathChildrenCacheEvent.Type.CHILD_ADDED事件
- PathChildrenCacheEvent.Type.CHILD_UPDATED事件
- NORMAL模式下,這里為空
- 可以參見POST_INITIALIZED_EVENT模式
- 重新查詢所有需要的數(shù)據(jù)
- 不會(huì)觸發(fā)任何事件
- 逐個(gè)讀取節(jié)點(diǎn)數(shù)據(jù)和狀態(tài)
- 構(gòu)建ChildData放入currentData
- 參見NORMAL模式,但不同的是
- 更新initialSet時(shí)
- 如果initialSet的Map不為空
- POST_INITIALIZED_EVENT模式下,這里已經(jīng)初始化了Map
- 如果initialSet中的數(shù)據(jù)都已經(jīng)同步完成(都不等于NULL_CHILD_DATA)
- 將initialSet制空
- 觸發(fā)PathChildrenCacheEvent.Type.INITIALIZED事件
5.5 節(jié)點(diǎn)發(fā)生變化
在啟動(dòng)start()已經(jīng)給path上增加了一個(gè)監(jiān)聽器childrenWatcher
private volatile Watcher childrenWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event){offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));} };- 以RefreshMode.STANDARD模式刷新緩存
- 會(huì)對(duì)本地的緩存數(shù)據(jù)和zk節(jié)點(diǎn)做比較
- 只是處理新的緩存數(shù)據(jù)
- 注意操作的參數(shù)PathChildrenCache.this
- this不同了
5.6 數(shù)據(jù)發(fā)生變化
在每次獲取緩存數(shù)據(jù)時(shí)(getDataAndStat方法),在每個(gè)緩存上添加了監(jiān)聽器dataWatcher:
private volatile Watcher dataWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event){try{if ( event.getType() == Event.EventType.NodeDeleted ){remove(event.getPath());}else if ( event.getType() == Event.EventType.NodeDataChanged ){offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}} };- 節(jié)點(diǎn)刪除時(shí)
- 清理緩存
- 觸發(fā)PathChildrenCacheEvent.Type.CHILD_REMOVED事件
- 數(shù)據(jù)發(fā)生變化時(shí)
- 執(zhí)行GetDataOperation操作
- 也就是再次執(zhí)行g(shù)etDataAndStat方法
- 執(zhí)行GetDataOperation操作
- 注意操作的參數(shù)PathChildrenCache.this
- this不同了
5.7 獲取當(dāng)前數(shù)據(jù)
public List<ChildData> getCurrentData() {return ImmutableList.copyOf(Sets.<ChildData>newTreeSet(currentData.values())); }public ChildData getCurrentData(String fullPath) {return currentData.get(fullPath); }都是從本地?cái)?shù)據(jù)中獲取
5.8 清理
5.8.1 清理緩存
public void clear() {currentData.clear(); } public void clearAndRefresh() throws Exception {currentData.clear();offerOperation(new RefreshOperation(this, RefreshMode.STANDARD)); }清空本地?cái)?shù)據(jù)
如果需要?jiǎng)t使用RefreshMode.STANDARD模式,刷新
5.8.2 清理緩存數(shù)據(jù)
public void clearDataBytes(String fullPath) {clearDataBytes(fullPath, -1); } public boolean clearDataBytes(String fullPath, int ifVersion) {ChildData data = currentData.get(fullPath);if ( data != null ){if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) ){if ( data.getData() != null ){currentData.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null));}return true;}}return false; }保留緩存信息,但是數(shù)據(jù)部分制空
5.9 鏈接狀態(tài)變化
在啟動(dòng)時(shí)(start())中為鏈接添加了connectionStateListener監(jiān)聽器:
private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){handleStateChange(newState);} };private void handleStateChange(ConnectionState newState) {switch ( newState ){case SUSPENDED:{offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));break;}case LOST:{offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));break;}case CONNECTED:case RECONNECTED:{try{offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}break;}} }主要都是根據(jù)鏈接狀態(tài),觸發(fā)不同的操作,以及觸發(fā)業(yè)務(wù)監(jiān)聽器來執(zhí)行。
- 由于數(shù)據(jù)都是緩存,所以在鏈接丟失,中斷時(shí),僅僅時(shí)觸發(fā)事件,并沒有將數(shù)據(jù)置為不可用
- 當(dāng)鏈接建立CONNECTED,以及恢復(fù)時(shí)RECONNECTED都觸發(fā)了一次RefreshMode.FORCE_GET_DATA_AND_STAT模式的刷新操作。
5.10 關(guān)閉
在使用完之后,需要調(diào)用close()方法:
public void close() throws IOException {if ( state.compareAndSet(State.STARTED, State.CLOSED) ){client.getConnectionStateListenable().removeListener(connectionStateListener);listeners.clear();executorService.close();client.clearWatcherReferences(childrenWatcher);client.clearWatcherReferences(dataWatcher);// TODO// This seems to enable even more GC - I'm not sure why yet - it// has something to do with Guava's cache and circular referencesconnectionStateListener = null;childrenWatcher = null;dataWatcher = null;} }- 原子操作,將狀態(tài)更新為CLOSED
- 移除鏈接狀態(tài)監(jiān)聽器
- 清空業(yè)務(wù)監(jiān)聽器
- 關(guān)閉線程池
- 清空節(jié)點(diǎn)監(jiān)聽器
- 清空數(shù)據(jù)監(jiān)聽器
6. 小結(jié)
PathChildrenCache雖然名字帶有Cache。 但其實(shí)并不是一個(gè)完整的緩存。
應(yīng)該說,它僅僅是對(duì)path下諸多節(jié)點(diǎn)進(jìn)行統(tǒng)一的管理。 當(dāng)這些節(jié)點(diǎn)發(fā)生變動(dòng),或者數(shù)據(jù)發(fā)生變化時(shí),都可以被PathChildrenCache發(fā)現(xiàn),并同步到本地Map中。以此來達(dá)到一個(gè)緩存的概念。
從API中也能發(fā)現(xiàn),它只能獲取數(shù)據(jù)。至于放置緩存,則需要另外實(shí)現(xiàn)。
- 其實(shí)也簡單,直接向path下新建節(jié)點(diǎn)并寫入數(shù)據(jù)就行
可以通過getListenable().addListener(listener);添加自定義監(jiān)聽器,從而實(shí)現(xiàn)對(duì)緩存進(jìn)行更細(xì)致的控制。
7. 示例
這里可以參考官方的示例
轉(zhuǎn)載于:https://my.oschina.net/roccn/blog/918209
總結(jié)
以上是生活随笔為你收集整理的[Curator] Path Cache 的使用与分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (转)Arcgis for Js之Gra
- 下一篇: 在SpringMVC中使用Jackson