zookeeper源码(05)数据存储
本文詳細分析一下zookeeper的數據存儲。
ZKDatabase
維護zookeeper服務器內存數據庫,包括session、dataTree和committedlog數據,從磁盤讀取日志和快照后啟動。
關鍵字段
// 數據節點樹
protected DataTree dataTree;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
protected FileTxnSnapLog snapLog; // 用于操作底層數據文件
// committedLog中第一條和最后一條數據的zxid
protected long minCommittedLog, maxCommittedLog;
// committedLog最大容量,默認500
public int commitLogCount;
// 維護最后提交的請求集,可用于快速follower同步
protected Queue<Proposal> committedLog = new ArrayDeque<>();
protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
private volatile boolean initialized = false;
// txnlog計數
private AtomicInteger txnCount = new AtomicInteger(0);
構造方法
public ZKDatabase(FileTxnSnapLog snapLog) {
dataTree = createDataTree();
sessionsWithTimeouts = new ConcurrentHashMap<>();
this.snapLog = snapLog;
// 初始化snapshotSizeFactor默認0.33
// 初始化commitLogCount默認500
}
public DataTree createDataTree() {
return new DataTree();
}
創建DataTree對象:創建/zookeeper/quota、/zookeeper/config節點,創建dataWatches和childWatches對象(使用WatchManager實現類)。
主要方法
// 返回committedLog集
public synchronized Collection<Proposal> getCommittedLog();
// 返回dataTree.lastProcessedZxid的值
public long getDataTreeLastProcessedZxid();
// 返回dataTree.getSessions()集
public Collection<Long> getSessions();
// 返回sessionsWithTimeouts的size
public long getSessionCount();
// 從磁盤加載dataTree并把txnLog加載到committedLog中
public long loadDataBase() throws IOException;
// 從磁盤加載txnLog到committedLog中
public long fastForwardDataBase() throws IOException;
// 使用addCommittedProposal方法添加committedLog
private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest);
// 添加committedLog
public void addCommittedProposal(Request request);
// 從txnLog加載Proposal
public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit);
// 使用dataTree.removeCnxn(cnxn)
public void removeCnxn(ServerCnxn cnxn);
// 使用dataTree.killSession(sessionId, zxid)
public void killSession(long sessionId, long zxid);
// 使用dataTree.dumpEphemerals(pwriter)
public void dumpEphemerals(PrintWriter pwriter);
// 使用dataTree.getEphemerals()
public Map<Long, Set<String>> getEphemerals();
// 使用dataTree.getNodeCount()
public int getNodeCount();
// 使用dataTree.getEphemerals(sessionId)
public Set<String> getEphemerals(long sessionId);
// 給dataTree.lastProcessedZxid賦值
public void setlastProcessedZxid(long zxid);
// 使用dataTree.processTxn(hdr, txn, digest)
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest);
// 使用dataTree.statNode(path, serverCnxn)
public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException;
// 使用dataTree.getNode(path)
public DataNode getNode(String path);
// 使用dataTree.getData(path, stat, watcher)
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
// 使用dataTree.setWatches方法實現
public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches,
List<String> persistentWatches, List<String> persistentRecursiveWatches,
Watcher watcher);
// 使用dataTree.addWatch(basePath, watcher, mode)
public void addWatch(String basePath, Watcher watcher, int mode);
// 使用dataTree.getChildren(path, stat, watcher)
public List<String> getChildren(
String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
// 使用dataTree.getAllChildrenNumber(path)
public int getAllChildrenNumber(String path) throws KeeperException.NoNodeException;
// Truncate the ZKDatabase to the specified zxid
public boolean truncateLog(long zxid) throws IOException;
// Deserialize a snapshot from an input archive
public void deserializeSnapshot(InputArchive ia) throws IOException;
// Deserialize a snapshot that contains FileHeader from an input archive
// It is used by the admin restore command
public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException;
// Serialize the snapshot
public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException;
// 使用snapLog.append(si)保存數據,txnCount++
public boolean append(Request si) throws IOException;
// 使用snapLog.rollLog()滾動底層txnLog
public void rollLog() throws IOException;
// 使用snapLog.commit()提交底層txnLog
public void commit() throws IOException;
// 初始化/zookeeper/config數據,集群啟動時已介紹
public synchronized void initConfigInZKDatabase(QuorumVerifier qv);
// 使用dataTree.containsWatcher(path, type, watcher)
public boolean containsWatcher(String path, WatcherType type, Watcher watcher);
// 使用dataTree.removeWatch(path, type, watcher)
public boolean removeWatch(String path, WatcherType type, Watcher watcher);
loadDataBase方法
從磁盤加載dataTree并把txnLog加載到committedLog中:
public long loadDataBase() throws IOException {
long startTime = Time.currentElapsedTime();
// 1. 從snapshot加載dataTree
// 2. 使用fastForwardFromEdits方法從txnLog加載dataTree和committedlog
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
// 略
return zxid;
}
fastForwardDataBase方法
從txnLog加載dataTree和committedlog集:
public long fastForwardDataBase() throws IOException {
// 會通過commitProposalPlaybackListener調用addCommittedProposal添加committedlog
long zxid = snapLog.fastForwardFromEdits(
dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
addCommittedProposal方法
private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest) {
Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
r.setTxnDigest(digest);
addCommittedProposal(r);
}
public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.remove();
minCommittedLog = committedLog.peek().packet.getZxid();
}
if (committedLog.isEmpty()) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
byte[] data = request.getSerializeData();
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
committedLog.add(p);
maxCommittedLog = p.packet.getZxid();
} finally {
wl.unlock();
}
}
getProposalsFromTxnLog方法
從txnlog獲取Proposal,只填充packet字段:
public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit) {
if (sizeLimit < 0) {
return TxnLogProposalIterator.EMPTY_ITERATOR;
}
TxnIterator itr = null;
try {
// 從txnLog文件讀取數據
// 底層通過FileTxnIterator類讀取文件流實現
itr = snapLog.readTxnLog(startZxid, false);
// If we cannot guarantee that this is strictly the starting txn
// after a given zxid, we should fail.
if ((itr.getHeader() != null) && (itr.getHeader().getZxid() > startZxid)) {
itr.close();
return TxnLogProposalIterator.EMPTY_ITERATOR;
}
if (sizeLimit > 0) {
long txnSize = itr.getStorageSize();
if (txnSize > sizeLimit) {
itr.close();
return TxnLogProposalIterator.EMPTY_ITERATOR;
}
}
} catch (IOException e) {
itr.close();
return TxnLogProposalIterator.EMPTY_ITERATOR;
}
return new TxnLogProposalIterator(itr);
}
truncateLog方法
把txnlog數據truncate到指定的zxid位置,然后重新加載DataTree數據:
public boolean truncateLog(long zxid) throws IOException {
clear();
// truncate the log
boolean truncated = snapLog.truncateLog(zxid);
if (!truncated) {
return false;
}
loadDataBase();
return true;
}
deserializeSnapshot方法
public void deserializeSnapshot(InputArchive ia) throws IOException {
clear();
SerializeUtils.deserializeSnapshot(getDataTree(), ia, getSessionWithTimeOuts());
initialized = true;
}
public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException {
clear();
// deserialize data tree
final DataTree dataTree = getDataTree();
FileSnap.deserialize(dataTree, getSessionWithTimeOuts(), ia);
SnapStream.checkSealIntegrity(is, ia);
// deserialize digest and check integrity
if (dataTree.deserializeZxidDigest(ia, 0)) {
SnapStream.checkSealIntegrity(is, ia);
}
// deserialize lastProcessedZxid and check integrity
if (dataTree.deserializeLastProcessedZxid(ia)) {
SnapStream.checkSealIntegrity(is, ia);
}
// compare the digest to find inconsistency
if (dataTree.getDigestFromLoadedSnapshot() != null) {
dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid);
}
initialized = true;
}
serializeSnapshot方法
public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException {
SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
}
DataTree
維護樹狀結構,沒有任何網絡或客戶端連接代碼,因此可以以獨立的方式進行測試。
維護兩個并行的數據結構:一個從完整路徑映射到DataNodes的哈希表和一個DataNodes樹,對路徑的所有訪問都是通過哈希表進行的,只有在序列化到磁盤時才遍歷DataNodes樹。
關鍵字段
// This map provides a fast lookup to the data nodes
private final NodeHashMap nodes;
// Watcher
private IWatchManager dataWatches;
private IWatchManager childWatches;
// cached total size of paths and data for all DataNodes
private final AtomicLong nodeDataSize = new AtomicLong(0);
// This hashtable lists the paths of the ephemeral nodes of a session
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<>();
// This set contains the paths of all container nodes
private final Set<String> containers = Collections.newSetFromMap(new ConcurrentHashMap<>());
// This set contains the paths of all ttl nodes
private final Set<String> ttls = Collections.newSetFromMap(new ConcurrentHashMap<>());
// This is a pointer to the root of the DataTree
private DataNode root = new DataNode(new byte[0], -1L, new StatPersisted());
// create a /zookeeper filesystem that is the proc filesystem of zookeeper
private final DataNode procDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
// create a /zookeeper/quota node for maintaining quota properties for zookeeper
private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
// 最新被處理的zxid
public volatile long lastProcessedZxid = 0;
- NodeHashMap - NodeHashMapImpl實現類使用ConcurrentHashMap保存path -> DataNode數據
- IWatchManager和Watcher - 監聽器管理
- DataNode - 封裝樹節點信息,包括data、children、stat等
構造方法
DataTree(DigestCalculator digestCalculator) {
this.digestCalculator = digestCalculator;
nodes = new NodeHashMapImpl(digestCalculator);
// rather than fight it, let root have an alias
nodes.put("", root); // "" -> root
nodes.putWithoutDigest(rootZookeeper, root); // "/" -> root
// add the proc node and quota node
root.addChild(procChildZookeeper); // 添加zookeeper子節點
nodes.put(procZookeeper, procDataNode); // "/zookeeper" -> procDataNode
procDataNode.addChild(quotaChildZookeeper); // 添加quota子節點
nodes.put(quotaZookeeper, quotaDataNode); // "/zookeeper/quota" -> quotaDataNode
addConfigNode(); // 添加/zookeeper/config節點
nodeDataSize.set(approximateDataSize());
try {
// 使用WatchManager實現類
dataWatches = WatchManagerFactory.createWatchManager();
childWatches = WatchManagerFactory.createWatchManager();
} catch (Exception e) {}
}
public void addConfigNode() {
DataNode zookeeperZnode = nodes.get(procZookeeper); // 找到/zookeeper節點
if (zookeeperZnode != null) {
zookeeperZnode.addChild(configChildZookeeper); // 添加config子節點
}
nodes.put(configZookeeper, new DataNode(new byte[0], -1L, new StatPersisted()));
try {
// Reconfig node is access controlled by default (ZOOKEEPER-2014).
setACL(configZookeeper, ZooDefs.Ids.READ_ACL_UNSAFE, -1);
} catch (NoNodeException e) {}
}
主要方法
// Add a new node to the DataTree
public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner,
int parentCVersion, long zxid, long time, Stat outputStat);
// Remove path from the DataTree
public void deleteNode(String path, long zxid);
// 為節點設置數據
public Stat setData(String path, byte[] data, int version, long zxid, long time);
// 1. 獲取path的data
// 2. 如果watcher不為null則addWatch
public byte[] getData(String path, Stat stat, Watcher watcher);
// 使用node.copyStat(stat)保存stat數據
public Stat statNode(String path, Watcher watcher);
// 1. copyStat到stat中
// 2. addWatch
// 3. getChildren
public List<String> getChildren(String path, Stat stat, Watcher watcher);
// 設置、獲取權限
public Stat setACL(String path, List<ACL> acl, int version);
public List<ACL> getACL(String path, Stat stat);
public List<ACL> getACL(DataNode node);
// 添加Watcher
public void addWatch(String basePath, Watcher watcher, int mode);
// 處理事務請求
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn);
// 殺會話,使用deleteNodes刪除paths2DeleteLocal和paths2DeleteInTxn集
void killSession(
long session, long zxid, Set<String> paths2DeleteLocal, List<String> paths2DeleteInTxn);
// 遍歷paths2Delete調用deleteNode方法刪除節點
void deleteNodes(long session, long zxid, Iterable<String> paths2Delete);
// 遞歸方式獲取path下面的總節點數和總字節數
private void getCounts(String path, Counts counts);
// 序列化
void serializeNode(OutputArchive oa, StringBuilder path);
public void serializeNodeData(OutputArchive oa, String path, DataNode node);
public void serializeAcls(OutputArchive oa);
public void serializeNodes(OutputArchive oa);
public void serialize(OutputArchive oa, String tag);
// 反序列化
public void deserialize(InputArchive ia, String tag);
// 從dataWatches和childWatches移除watcher
public void removeCnxn(Watcher watcher);
// 觸發或addWatch
public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches,
List<String> persistentWatches, List<String> persistentRecursiveWatches,
Watcher watcher);
// 為path設置新的cversion和zxid
public void setCversionPzxid(String path, int newCversion, long zxid);
// Add the digest to the historical list, and update the latest zxid digest
private void logZxidDigest(long zxid, long digest);
// 序列化、反序列化lastProcessedZxidDigest
public boolean serializeZxidDigest(OutputArchive oa);
public boolean deserializeZxidDigest(InputArchive ia, long startZxidOfSnapshot);
// 序列化、反序列化lastProcessedZxid
public boolean serializeLastProcessedZxid(final OutputArchive oa);
public boolean deserializeLastProcessedZxid(final InputArchive ia);
// Compares the actual tree's digest with that in the snapshot.
// Resets digestFromLoadedSnapshot after comparison.
public void compareSnapshotDigests(long zxid);
// Compares the digest of the tree with the digest present in transaction digest.
// If there is any error, logs and alerts the watchers.
public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest);
createNode方法
processTxn中會使用該方法創建節點:
public void createNode(final String path, byte[] data, List<ACL> acl,
long ephemeralOwner, int parentCVersion, long zxid,
long time, Stat outputStat) throws NoNodeException, NodeExistsException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
StatPersisted stat = createStat(zxid, time, ephemeralOwner); // Create a node stat
DataNode parent = nodes.get(parentName); // 父節點需要存在
synchronized (parent) {
Long acls = aclCache.convertAcls(acl);
Set<String> children = parent.getChildren(); // path節點不能存在
nodes.preChange(parentName, parent); // 執行removeDigest
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++; // childVersion遞增
}
if (parentCVersion > parent.stat.getCversion()) {
parent.stat.setCversion(parentCVersion); // 父節點的childVersion
parent.stat.setPzxid(zxid); // 父節點processZxid
}
DataNode child = new DataNode(data, acls, stat);
parent.addChild(childName); // 添加節點
nodes.postChange(parentName, parent);
nodeDataSize.addAndGet(getNodeSize(path, child.data));
nodes.put(path, child); // 維護NodeHashMap
EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);// 通常是VOID|NORMAL
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (ephemeralOwner != 0) {
// 維護臨時節點
HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
synchronized (list) {
list.add(path);
}
}
if (outputStat != null) {
child.copyStat(outputStat); // 把權限保存到outputStat中
}
}
// 略
// 觸發監聽器
dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
childWatches.triggerWatch(
parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}
deleteNode方法
public void deleteNode(String path, long zxid) throws NoNodeException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
DataNode parent = nodes.get(parentName); // 父節點要存在
synchronized (parent) {
nodes.preChange(parentName, parent);
parent.removeChild(childName); // 移除子節點
if (zxid > parent.stat.getPzxid()) {
parent.stat.setPzxid(zxid);
}
nodes.postChange(parentName, parent);
}
DataNode node = nodes.get(path); // 節點要存在
nodes.remove(path); // 從NodeHashMap移除
synchronized (node) {
aclCache.removeUsage(node.acl);
nodeDataSize.addAndGet(-getNodeSize(path, node.data));
}
synchronized (parent) {
long owner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(owner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.remove(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.remove(path);
} else if (owner != 0) { // 移除臨時節點
Set<String> nodes = ephemerals.get(owner);
if (nodes != null) {
synchronized (nodes) {
nodes.remove(path);
}
}
}
}
// 略
// 觸發監聽器
WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
childWatches.triggerWatch(
"".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
}
setData方法
public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path); // 節點要存在
byte[] lastData;
synchronized (n) {
lastData = n.data;
nodes.preChange(path, n);
n.data = data; // data賦值
n.stat.setMtime(time); // 修改時間
n.stat.setMzxid(zxid); // 修改的zxid
n.stat.setVersion(version); // 版本
n.copyStat(s); // 保存stat
nodes.postChange(path, n);
}
// 略
// 觸發監聽器
dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
return s;
}
setAcl等acl方法
public Stat setACL(String path, List<ACL> acl, int version) throws NoNodeException {
DataNode n = nodes.get(path);
synchronized (n) {
Stat stat = new Stat();
aclCache.removeUsage(n.acl);
nodes.preChange(path, n);
n.stat.setAversion(version); // access時間
n.acl = aclCache.convertAcls(acl); // 設置權限
n.copyStat(stat);
nodes.postChange(path, n);
return stat;
}
}
public List<ACL> getACL(String path, Stat stat) throws NoNodeException {
DataNode n = nodes.get(path);
synchronized (n) {
if (stat != null) {
n.copyStat(stat);
}
return new ArrayList<>(aclCache.convertLong(n.acl));
}
}
public List<ACL> getACL(DataNode node) {
synchronized (node) {
return aclCache.convertLong(node.acl);
}
}
addWatch方法
public void addWatch(String basePath, Watcher watcher, int mode) {
WatcherMode watcherMode = WatcherMode.fromZooDef(mode); // PERSISTENT_RECURSIVE or PERSISTENT
dataWatches.addWatch(basePath, watcher, watcherMode); // 只給節點添加Watcher
if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
childWatches.addWatch(basePath, watcher, watcherMode); // 遞歸添加Watcher
}
}
processTxn方法
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create: // 創建節點
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(),
header.getZxid(), header.getTime(), null);
break;
case OpCode.create2: // 創建節點并保存stat
CreateTxn create2Txn = (CreateTxn) txn;
rc.path = create2Txn.getPath();
Stat stat = new Stat();
createNode(create2Txn.getPath(), create2Txn.getData(), create2Txn.getAcl(),
create2Txn.getEphemeral() ? header.getClientId() : 0, create2Txn.getParentCVersion(),
header.getZxid(), header.getTime(), stat);
rc.stat = stat;
break;
case OpCode.createTTL:
CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
rc.path = createTtlTxn.getPath();
stat = new Stat();
createNode(createTtlTxn.getPath(), createTtlTxn.getData(), createTtlTxn.getAcl(),
EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), // ttl
createTtlTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
rc.stat = stat;
break;
case OpCode.createContainer:
CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
rc.path = createContainerTxn.getPath();
stat = new Stat();
createNode(createContainerTxn.getPath(), createContainerTxn.getData(),
createContainerTxn.getAcl(), EphemeralType.CONTAINER_EPHEMERAL_OWNER,
createContainerTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
rc.stat = stat;
break;
case OpCode.delete:
case OpCode.deleteContainer:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid()); // 刪除節點
break;
case OpCode.reconfig:
case OpCode.setData: // 設置節點數據
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(),
header.getZxid(), header.getTime());
break;
case OpCode.setACL: // 設置ACL
SetACLTxn setACLTxn = (SetACLTxn) txn;
rc.path = setACLTxn.getPath();
rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
break;
case OpCode.closeSession: // 關閉session
long sessionId = header.getClientId();
if (txn != null) {
killSession(sessionId, header.getZxid(), ephemerals.remove(sessionId),
((CloseSessionTxn) txn).getPaths2Delete());
} else {
killSession(sessionId, header.getZxid());
}
break;
case OpCode.error:
ErrorTxn errTxn = (ErrorTxn) txn;
rc.err = errTxn.getErr();
break;
case OpCode.check:
CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
rc.path = checkTxn.getPath();
break;
case OpCode.multi:
// 遍歷處理每一個Txn
break;
}
} catch (KeeperException e) {
rc.err = e.code().intValue();
} catch (IOException e) {}
//
if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
int lastSlash = rc.path.lastIndexOf('/');
String parentName = rc.path.substring(0, lastSlash);
CreateTxn cTxn = (CreateTxn) txn;
try {
setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
} catch (NoNodeException e) {
rc.err = e.code().intValue();
}
}
//
if (!isSubTxn) {
if (rc.zxid > lastProcessedZxid) {
lastProcessedZxid = rc.zxid; // 設置最新lastProcessedZxid
}
// 略
}
return rc;
}
serialize相關方法
void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
String pathString = path.toString();
DataNode node = getNode(pathString); // 查找節點
String[] children;
DataNode nodeCopy;
synchronized (node) {
StatPersisted statCopy = new StatPersisted();
copyStatPersisted(node.stat, statCopy);
// we do not need to make a copy of node.data because the contents are never changed
nodeCopy = new DataNode(node.data, node.acl, statCopy);
children = node.getChildren().toArray(new String[0]);
}
serializeNodeData(oa, pathString, nodeCopy); // 把節點寫入到oa中
path.append('/');
int off = path.length();
// 遍歷子節點,將子節點寫入oa中
for (String child : children) {
path.delete(off, Integer.MAX_VALUE);
path.append(child);
serializeNode(oa, path);
}
}
// visible for test
public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException {
oa.writeString(path, "path");
oa.writeRecord(node, "node");
}
public void serializeAcls(OutputArchive oa) throws IOException {
aclCache.serialize(oa);
}
// 序列化整個NodeHashMap對象
public void serializeNodes(OutputArchive oa) throws IOException {
serializeNode(oa, new StringBuilder());
// / marks end of stream
// we need to check if clear had been called in between the snapshot.
if (root != null) {
oa.writeString("/", "path");
}
}
// 完整序列化
public void serialize(OutputArchive oa, String tag) throws IOException {
serializeAcls(oa);
serializeNodes(oa);
}
deserialize相關方法
public void deserialize(InputArchive ia, String tag) throws IOException {
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
nodeDataSize.set(0);
String path = ia.readString("path");
while (!"/".equals(path)) {
DataNode node = new DataNode();
ia.readRecord(node, "node");
nodes.put(path, node);
synchronized (node) {
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
String parentPath = path.substring(0, lastSlash);
DataNode parent = nodes.get(parentPath);
if (parent == null) {
throw new IOException(
"Invalid Datatree, unable to find parent " + parentPath + " of path " + path);
}
parent.addChild(path.substring(lastSlash + 1));
long owner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(owner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (owner != 0) {
HashSet<String> list = ephemerals.computeIfAbsent(owner, k -> new HashSet<>());
list.add(path);
}
}
path = ia.readString("path");
}
// have counted digest for root node with "", ignore here to avoid counting twice for root node
nodes.putWithoutDigest("/", root);
nodeDataSize.set(approximateDataSize());
// we are done with deserializing the datatree update the quotas - create path trie
// and also update the stat nodes
setupQuota();
aclCache.purgeUnused();
}
FileTxnSnapLog
操作TxnLog和SnapShot的入口類。
構造方法會創建dataDir和snapDir目錄,判斷數據目錄可寫,創建txnLog和snapLog對象訪問數據文件。
主要方法
// 從snapshots和transaction logs加載數據庫
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);
// fast forward the server database to have the latest transactions in it
// This is the same as restore, but only reads from the transaction logs and not restores from a snapshot
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);
// 使用txnLog.read(zxid, fastForward)方法從指定zxid加載TxnIterator
public TxnIterator readTxnLog(long zxid, boolean fastForward);
// process the transaction on the datatree
public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions, Record txn);
// 使用txnLog.getLastLoggedZxid()方法獲取last logged zxid
public long getLastLoggedZxid();
// 把datatree和sessions保存到snapshot中
public File save(
DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap);
// 把txnLog truncate到指定的zxid
public boolean truncateLog(long zxid);
// 使用snaplog.findMostRecentSnapshot()方法加載最近snapshot文件
public File findMostRecentSnapshot();
// 使用snaplog.findNRecentSnapshots(n)方法加載n個最近snapshot文件
public List<File> findNRecentSnapshots(int n);
// 使用snaplog.findNValidSnapshots(n)方法加載n個合法snapshot文件
public List<File> findNValidSnapshots(int n);
// 獲取快照文件,可能包含比給定zxid更新的事務。
// 包括起始zxid大于給定zxid的日志,以及起始zxid小于給定zxid的最新事務日志。
// 后一個日志文件可能包含超出給定zxid的事務。
public File[] getSnapshotLogs(long zxid);
// 使用txnLog.append(si)追加數據
public boolean append(Request si);
// txnLog.commit()提交數據
public void commit();
restore方法
- 從snapshot加載dataTree數據
- 從txnlog加載dataTree和committedlog數據
- 如果沒有加載到dataTree數據,將空的dataTree數據保存到snapshot.0文件中
fastForwardFromEdits方法
從txnlog加載dataTree和committedlog數據。
processTransaction方法
public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions,
Record txn) throws KeeperException.NoNodeException {
ProcessTxnResult rc;
switch (hdr.getType()) {
case OpCode.createSession:
sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut());
// give dataTree a chance to sync its lastProcessedZxid
rc = dt.processTxn(hdr, txn);
break;
case OpCode.closeSession:
sessions.remove(hdr.getClientId());
rc = dt.processTxn(hdr, txn);
break;
default:
rc = dt.processTxn(hdr, txn);
}
}
save方法
public File save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap) throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
// 文件名snapshot.${lastZxid}
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
try {
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
return snapshotFile;
} catch (IOException e) {
throw e;
}
}
truncateLog方法
public boolean truncateLog(long zxid) {
try {
// close the existing txnLog and snapLog
close();
// truncate it
try (FileTxnLog truncLog = new FileTxnLog(dataDir)) {
boolean truncated = truncLog.truncate(zxid);
// re-open the txnLog and snapLog
// I'd rather just close/reopen this object itself, however that
// would have a big impact outside ZKDatabase as there are other
// objects holding a reference to this object.
txnLog = new FileTxnLog(dataDir);
snapLog = new FileSnap(snapDir);
return truncated;
}
} catch (IOException e) {
return false;
}
}
TxnLog接口和FileTxnLog實現類
txnlog
使用文件保存所有的事務操作,客戶端的寫操作會先寫入txnlog文件,在follower達到quorum狀態后提交到dataTree中,在ZKDatabase啟動階段,如果txnlog的zxid大于snapshot的zxid時,會加載txnlog文件數據回放事務,提交到dataTree中。
TxnLog接口
Interface for reading transaction logs.
public interface TxnLog extends Closeable {
// Setter for ServerStats to monitor fsync threshold exceed
void setServerStats(ServerStats serverStats);
// roll the current log being appended to
void rollLog() throws IOException;
// Append a request to the transaction log with a digset
boolean append(Request request) throws IOException;
// Start reading the transaction logs from a given zxid
TxnIterator read(long zxid) throws IOException;
// the last zxid of the logged transactions
long getLastLoggedZxid() throws IOException;
// truncate the log to get in sync with the leader
boolean truncate(long zxid) throws IOException;
// the dbid for this transaction log
long getDbId() throws IOException;
// commit the transaction and make sure they are persisted
void commit() throws IOException;
// return transaction log's elapsed sync time in milliseconds
long getTxnLogSyncElapsedTime();
void close() throws IOException;
void setTotalLogSize(long size);
long getTotalLogSize();
}
FileTxnLog實現類
This class implements the TxnLog interface. It provides api's to access the txnlogs and add entries to it.
The format of a Transactional log is as follows:
LogFile:
FileHeader TxnList ZeroPad
FileHeader: {
magic 4bytes (ZKLG)
version 4bytes
dbid 8bytes
}
TxnList:
Txn || Txn TxnList
Txn:
checksum Txnlen TxnHeader Record 0x42
checksum: 8bytes Adler32 is currently used
calculated across payload -- Txnlen, TxnHeader, Record and 0x42
Txnlen:
len 4bytes
TxnHeader: {
sessionid 8bytes
cxid 4bytes
zxid 8bytes
time 8bytes
type 4bytes
}
Record:
See Jute definition file for details on the various record types
ZeroPad:
0 padded to EOF (filled during preallocation stage)
FileTxnLog主要方法實現
public synchronized boolean append(Request request) throws IOException {
TxnHeader hdr = request.getHdr();
if (hdr == null) { // 不是事務請求
return false;
}
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("...");
} else {
lastZxidSeen = hdr.getZxid();
}
if (logStream == null) {
// 創建新log.${hdr.zxid}文件
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream = new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId); // 文件頭
long dataSize = oa.getDataSize();
fhdr.serialize(oa, "fileheader"); // 寫文件頭
logStream.flush();
// 文件偏移量
filePosition += oa.getDataSize() - dataSize;
filePadding.setCurrentSize(filePosition);
streamsToFlush.add(fos);
}
fileSize = filePadding.padFile(fos.getChannel(), filePosition);
byte[] buf = request.getSerializeData();
long dataSize = oa.getDataSize();
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC"); // checksum
Util.writeTxnBytes(oa, buf); // 寫len, hdr, txn, digest, 0x42
unFlushedSize += oa.getDataSize() - dataSize; // 計算未flush字節數
return true;
}
public long getLastLoggedZxid() {
File[] files = getLogFiles(logDir.listFiles(), 0);
long maxLog = files.length > 0 ?
Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1;
// 最新的log文件的后綴作為zxid
long zxid = maxLog;
// 從文件解析最新zxid
try (FileTxnLog txn = new FileTxnLog(logDir); TxnIterator itr = txn.read(maxLog)) {
while (true) {
if (!itr.next()) {
break;
}
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
}
} catch (IOException e) {
}
return zxid;
}
public synchronized void rollLog() throws IOException {
if (logStream != null) {
this.logStream.flush(); // 把當前文件刷寫出去
prevLogsRunningTotal += getCurrentLogSize();
this.logStream = null; // 重置相關變量,后續append時會創建新的文件
oa = null;
fileSize = 0;
filePosition = 0;
unFlushedSize = 0;
}
}
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush(); // 刷寫文件
filePosition += unFlushedSize;
// If we have written more than we have previously preallocated,
// we should override the fileSize by filePosition.
if (filePosition > fileSize) {
fileSize = filePosition;
}
unFlushedSize = 0;
}
for (FileOutputStream log : streamsToFlush) {
log.flush(); // 刷寫文件
if (forceSync) {
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
channel.force(false);
// 略
}
}
// 關閉文件流
while (streamsToFlush.size() > 1) {
streamsToFlush.poll().close();
}
// Roll the log file if we exceed the size limit
if (txnLogSizeLimit > 0) { // 默認-1分支進不來
long logSize = getCurrentLogSize();
if (logSize > txnLogSizeLimit) {
rollLog();
}
}
}
// FileTxnIterator封裝logFile和輸入流對象,可以按照協議從文件流讀取txnLog數據
public TxnIterator read(long zxid) throws IOException {
return read(zxid, true);
}
public TxnIterator read(long zxid, boolean fastForward) throws IOException {
return new FileTxnIterator(logDir, zxid, fastForward);
}
// 將log文件truncate到指定zxid位置
public boolean truncate(long zxid) throws IOException {
try (FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid)) {
PositionInputStream input = itr.inputStream;
if (input == null) {
throw new IOException("No log files found to truncate");
}
long pos = input.getPosition();
// now, truncate at the current position
RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
raf.setLength(pos);
raf.close(); // 把最小的文件truncate到指定zxid位置
while (itr.goToNextLog()) { // 刪除所有>zxid的log文件
if (!itr.logFile.delete()) {
}
}
}
return true;
}
private static FileHeader readHeader(File file) throws IOException {
InputStream is = null;
try {
is = new BufferedInputStream(new FileInputStream(file));
InputArchive ia = BinaryInputArchive.getArchive(is);
FileHeader hdr = new FileHeader();
hdr.deserialize(ia, "fileheader"); // 反序列化
return hdr;
} finally {
// is.close();
}
}
FileTxnIterator類
this class implements the txnlog iterator interface which is used for reading the transaction logs.
內部使用List保存著比指定zxid大或者含有指定zxid數據的log文件,初始化階段會定位到參數zxid指定的位置,這樣在后續訪問時就可以從參數指定的zxid開始讀取數據了。
public FileTxnIterator(File logDir, long zxid, boolean fastForward) throws IOException {
this.logDir = logDir;
this.zxid = zxid;
init();
if (fastForward && hdr != null) {
while (hdr.getZxid() < zxid) { // 這里將數據移動到zxid位置
if (!next()) {
break;
}
}
}
}
void init() throws IOException {
storedFiles = new ArrayList<>();
// 倒序查找log文件
List<File> files = Util.sortDataDir(
FileTxnLog.getLogFiles(logDir.listFiles(), 0),
LOG_FILE_PREFIX,
false);
for (File f : files) {
if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
storedFiles.add(f);
} else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
// add the last logfile that is less than the zxid
storedFiles.add(f);
break;
}
}
goToNextLog(); // 定位到下一個文件
next(); // 定位到下一個log數據
}
SnapShot接口和FileSnap實現類
SnapShot接口
snapshot interface for the persistence layer. implement this interface for implementing snapshots.
public interface SnapShot {
// deserialize a data tree from the last valid snapshot and return the last zxid that was deserialized
long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;
// persist the datatree and the sessions into a persistence storage
void serialize(DataTree dt, Map<Long, Integer> sessions, File name, boolean fsync) throws IOException;
// find the most recent snapshot file
File findMostRecentSnapshot() throws IOException;
// get information of the last saved/restored snapshot
SnapshotInfo getLastSnapshotInfo();
// free resources from this snapshot immediately
void close() throws IOException;
}
FileSnap實現類
負責存儲、序列化和反序列化正確的快照。并提供對快照的訪問:
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
// 在snapDir下查找合法的快照文件,倒序,所以最新的在前面
List<File> snapList = findNValidSnapshots(100);
File snap = null;
long snapZxid = -1;
boolean foundValid = false;
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
snap = snapList.get(i);
snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
InputArchive ia = BinaryInputArchive.getArchive(snapIS);
deserialize(dt, sessions, ia); // 將數據反序列到dt
SnapStream.checkSealIntegrity(snapIS, ia);
// Deserializing the zxid digest from the input
// stream and update the digestFromLoadedSnapshot.
// 格式: zxid digestVersion digest
if (dt.deserializeZxidDigest(ia, snapZxid)) {
SnapStream.checkSealIntegrity(snapIS, ia);
}
// deserialize lastProcessedZxid and check inconsistency
// 讀lastZxid字段得到
if (dt.deserializeLastProcessedZxid(ia)) {
SnapStream.checkSealIntegrity(snapIS, ia);
}
foundValid = true;
break;
} catch (IOException e) {}
}
// 驗證foundValid
// 上次處理到的zxid
dt.lastProcessedZxid = snapZxid;
lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
// compare the digest if this is not a fuzzy snapshot, we want to compare and find inconsistent asap
if (dt.getDigestFromLoadedSnapshot() != null) {
dt.compareSnapshotDigests(dt.lastProcessedZxid);
}
return dt.lastProcessedZxid;
}
public static void deserialize(
DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
FileHeader header = new FileHeader(); // magic, version, dbid
header.deserialize(ia, "fileheader"); // 解析文件頭并驗證magic
if (header.getMagic() != SNAP_MAGIC) {
throw new IOException("mismatching magic headers");
}
// 反序列化
// 會話:
// Count Session(s)
// Session {id, timeout}
// 節點:
// AclCache PathNode(s)
// PathNode {path, node}
// node {data, acl, stat}
SerializeUtils.deserializeSnapshot(dt, ia, sessions);
}
protected List<File> findNValidSnapshots(int n) {
// 在snapDir下查找快照文件,倒序,最新的在前面
List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
int count = 0;
List<File> list = new ArrayList<>();
for (File f : files) {
try {
if (SnapStream.isValidSnapshot(f)) { // 驗證文件合法
list.add(f);
count++;
if (count == n) {
break;
}
}
} catch (IOException e) {}
}
return list;
}
public List<File> findNRecentSnapshots(int n) throws IOException {
// 在snapDir下查找快照文件,倒序,最新的在前面
List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
int count = 0;
List<File> list = new ArrayList<>();
for (File f : files) {
if (count == n) {
break;
}
if (Util.getZxidFromName(f.getName(), SNAPSHOT_FILE_PREFIX) != -1) {
count++;
list.add(f);
}
}
return list;
}
protected void serialize(
DataTree dt, Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException {
// 驗證header!=null
header.serialize(oa, "fileheader"); // 序列化文件頭
SerializeUtils.serializeSnapshot(dt, oa, sessions); // 序列化dataTree
}
public synchronized void serialize(
DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync) throws IOException {
if (!close) {
try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) {
OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
serialize(dt, sessions, oa, header);
SnapStream.sealStream(snapOS, oa);
// 序列化digest
if (dt.serializeZxidDigest(oa)) {
SnapStream.sealStream(snapOS, oa);
}
// 序列化lastProcessZxid
if (dt.serializeLastProcessedZxid(oa)) {
SnapStream.sealStream(snapOS, oa);
}
lastSnapshotInfo = new SnapshotInfo(
Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
snapShot.lastModified() / 1000);
}
} else {
throw new IOException("FileSnap has already been closed");
}
}
DatadirCleanupManager
啟動周期任務
清理過期文件,保留最新的snapRetainCount個snapshot文件和對應的txnlog文件,將其余過期的文件刪除掉。
purgeInterval參數指定執行周期(小時),默認0不開啟清理功能。
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
return;
}
// Don't schedule the purge task with zero or negative purge interval.
if (purgeInterval <= 0) {
return;
}
timer = new Timer("PurgeTask", true);
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
purgeTaskStatus = PurgeTaskStatus.STARTED;
}
PurgeTask
public void run() {
try {
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {}
}
PurgeTxnLog.purge方法:
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
// 倒序查找最新的num個snapshot文件
List<File> snaps = txnLog.findNValidSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
// 刪除掉zxid比snaps小的txnlog和snapshot文件
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}
ContainerManager
負責清理container節點,只能有leader管理。啟動后,定期檢查cversion>0且沒有子級的container節點和ttl節點。嘗試刪除節點,刪除的結果并不重要。如果提議失敗或容器節點不為空,則沒有任何危害。
總結
以上是生活随笔為你收集整理的zookeeper源码(05)数据存储的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [python]常用配置读取方法
- 下一篇: 元数据管理平台对比预研 Atlas VS