【笔记】从 Paxos 到 Zookeeper:第七章 Zookeeper 技术内幕之客户端
文章目錄
- 系統模型
- 數據模型
- 節點特性
- 版本-保證分布式數據原子性操作
- Watcher-數據變更通知
- ACL-保障數據安全
- 序列化與協議
- 序列化
- 通信協議
- 客戶端
- 一次會話的創建過程
- 初始化階段
- 會話創建階段
- 響應處理階段
- 服務器地址列表
- ClientCnxn:網絡 I/O
- 會話
系統模型
這一節首先從數據模型、節點特性、版本、Watcher 和 ACL 五方面來講述 Zookeeper 的系統模型。
數據模型
ZooKeeper 的視圖結構和 Unix 文件系統非常類似,但沒有引入傳統文件系統中目錄和文件的概念,而是使用了特有的“數據節點”概念,稱為 ZNode。ZNode 是 Zookeeper 中數據的最小單元,每個 ZNode 上都可以保存數據,同時還可以掛載子節點,因此構成了一個層次化的命名空間,可以稱之為“樹”。
廣義上說,事務是應用程序中一系列操作的集合。狹義上的數據庫中的事務一般包含了一系列對數據庫有序的讀寫操作,數據庫事務通常具備 ACID 特性。在 ZooKeeper 中事務是指能夠改變 ZooKeeper 服務器狀態的操作,一般包括數據節點創建、刪除、更新和客戶端會話創建于失效等操作。對于每一個事務請求,ZooKeeper 會為其分配一個全局唯一的事務 ID,稱為 ZXID,通常是一個64位的數字。
節點特性
ZooKeeper 中每個節點都是有生命周期的,其生命周期的長短取決于數據節點的節點類型。在 ZooKeeper 中,節點類型可以分為:持久節點 PERSISTENT、臨時節點 EPHEMERAL、順序節點 SEQUENTIAL 三大類,經過組合使用可以產生四種組合型節點類型:
ZooKeeper 上每個數據節點除了存儲數據內容之外,還存儲了數據節點本身的一些狀態信息,這些狀態信息對應原生 API 里的 Stat 類:
public class Stat implements Record {private long czxid; // 節點被創建時的事務 IDprivate long mzxid; // 節點最近一次更新的事務 IDprivate long ctime; // 創建時間private long mtime; // 最近更新時間private int version; // 版本號private int cversion; // 子節點的版本號private int aversion; // 節點的 ACL 版本號private long ephemeralOwner; // 創建該臨時節點的會話的 sessionID,如果是持久節點,其值為 0private int dataLength; // 數據內容長度private int numChildren;// 子節點個數private long pzxid; // 子節點列表最后一次被修改的事務 ID,不包含子節點內容的修改 }版本-保證分布式數據原子性操作
ZooKeeper 為數據節點引入了版本的概念,每個數據節點都具有三種類型的版本信息,對數據節點的任何變更都會引起版本號的變化。version 從 0 開始,表示“節點自創建后,其內容被更新過 0 次”,即使更新時內容本身沒有變化,version 值也會發生變化。
version 屬性最大的用處是用來實現樂觀鎖機制中的“寫入校驗”,在 ZooKeeper 原生 API setData 方法里就有一個 version 參數。
Watcher-數據變更通知
一個典型的發布訂閱系統能夠讓多個同時監聽某一主題對象,當這個主題對象自身發生變化時,通知所有訂閱者。ZooKeeper 中引入了 Watcher 機制來實現這種分布式的通知功能。ZooKeeper 允許客戶端向服務端注冊一個 Watcher 監聽,當指定事件觸發 Watcher 時,服務端會向客戶端發送一個事件通知。
ZooKeeper 的 Watcher 機制主要包括客戶端線程、客戶端 WatchManager 和 ZooKeeper 服務器三個部分。客戶端在向 ZooKeeper 服務器注冊 Watcher 的同時,會把 Watcher 存儲在客戶端的 WatchManager 中。當服務端觸發 Watcher 事件后,會向客戶端發送通知,客戶端線程從 WatchManager 中取出對應的 Watcher 對象來執行回調邏輯。
接口類 Watcher 用于表示一個標準的事件處理器,其定義了事件通知相關邏輯,包含 KeeperState 和 EventType 兩個枚舉類,分別代表了通知狀態和事件類型。同時,也定義了回調方法 process。
public interface Watcher {void process(WatchedEvent var1);public interface Event {public static enum EventType {None(-1),NodeCreated(1), // None 之外的事件類型,只在 KeeperState 為 SyncConnected 時出現NodeDeleted(2),NodeDataChanged(3),NodeChildrenChanged(4);private final int intValue;private EventType(int intValue) {this.intValue = intValue;}}public static enum KeeperState {/** @deprecated */@DeprecatedUnknown(-1),Disconnected(0),/** @deprecated */@DeprecatedNoSyncConnected(1),SyncConnected(3),AuthFailed(4), ConnectedReadOnly(5),SaslAuthenticated(6),Expired(-112);private final int intValue;private KeeperState(int intValue) {this.intValue = intValue;}}} }AuthFailed 事件的觸發條件并不簡單是因為沒有權限,一般是因為采用了錯誤的權限 Scheme。
回調方法 process 只有一個類型為 WatchedEvent 的參數,該類有三個屬性:狀態、事件類型、節點路徑。
public class WatchedEvent {private final KeeperState keeperState;private final EventType eventType;private String path; }客戶端在發送 Watch 信息給服務端時,并沒有把 Watcher 對象發送過去,只是發送了 watcher 標志位置為了 true。服務端收到請求后,判斷如果 watcher 為 true,就把代表客戶端服務端連接的對象 ServerCnxn 和節點路徑 Path 的映射關系放入 WatcherManager 中,也就是只是把需要 watcher 的連接存了起來。
ACL-保障數據安全
在 Unix 文件系統中,使用的是 UGO 權限控制機制,UGO 就是針對一個文件或目錄,對創建者(User)、創建者所在的組(Group)、其他用戶(Other)分別配置不同的權限。
在 ZooKeeper 中使用的權限控制方式叫 ACL,即訪問控制列表,是一種新穎的、更細粒度的權限控制方式。ACL 機制有三個概念:權限模式(Scheme)、授權對象(ID)、權限(Permission),通常使用“scheme?permission”來表示一個有效的 ACL 信息。
權限模式用來確定權限驗證中使用的檢驗策略,有以下四種:
授權對象 ID 指的是權限賦予的用戶或一個指定實體,例如 IP 地址或機器。在不同的權限模式下,授權對象是不同的,IP 權限模式下授權對象是 IP,其他權限模式下授權對象都是權限標識“username:password”。
權限是指通過權限檢查后運行執行的操作,在 ZooKeeper 中有五類:
以下命令的含義是,權限模式為 Digest,把數據節點 /test 的全部權限 cdrwa 授予授權對象“wkp:NrLAZ6FuRnaPGI93r1uPKD67MLw=”。
setAcl /test digest:wkp:NrLAZ6FuRnaPGI93r1uPKD67MLw=:cdrwa盡管 ZooKeeper 已經為我們提供了上述的四種權限模式,同時也提供給我們能夠自定義自己權限的方式——實現接口
org.apache.zookeeper.server.auth.AuthenticationProvider,并在啟動參數或配置文件里配置接口實現即可。
序列化與協議
序列化
Jute 是 ZooKeeper 中的序列化組件,幾乎沒有其他組件使用,下面簡單講講其序列化方法和原理。
每一個要使用 Jute 序列化的類,都需要實現 Record 接口,其有兩個方法:serialize 和 deserialize。下面是一個實現了 Record 接口的 Bean 示例:
public interface Record {public void serialize(OutputArchive archive, String tag)throws IOException;public void deserialize(InputArchive archive, String tag)throws IOException; }public class TestBean implements Record {private int intV;private String stringV;public TestBean() {}public TestBean(int intV, String stringV) {this.intV = intV;this.stringV = stringV;}@Overridepublic void deserialize(InputArchive archive, String tag)throws IOException {archive.startRecord(tag);this.intV = archive.readInt("intV");this.stringV = archive.readString("stringV");archive.endRecord(tag);}@Overridepublic void serialize(OutputArchive archive, String tag) throws IOException {archive.startRecord(this, tag);archive.writeInt(intV, "intV");archive.writeString(stringV, "stringV");archive.endRecord(this, tag);} }對 TestBean 進行序列化和反序列化的方法是:
public class BinaryTest1 {public static void main(String[] args) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);new TestBean(1, "testbean1").serialize(boa, "tag1");byte array[] = baos.toByteArray();ByteArrayInputStream bais = new ByteArrayInputStream(array);BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);TestBean newBean1 = new TestBean();newBean1.deserialize(bia, "tag1");System.out.println("intV = " + newBean1.getIntV() + ",stringV = "+ newBean1.getStringV());bais.close();baos.close();} }我們可以看到 TestBean 實現了 Record 的兩個方法,其序列化主要是使用了 InputArchive 和 OutputArchive,它兩有多種實現,最常用的是 BinaryOutputArchive 和 BinaryInputArchive。在上面的代碼里,序列化時,便使用了 BinaryOutputArchive,看下面的源碼可知,其底層其實是把數據輸出到了 ByteArrayOutputStream。
public class BinaryOutputArchive implements OutputArchive {private ByteBuffer bb = ByteBuffer.allocate(1024);private DataOutput out;public static BinaryOutputArchive getArchive(OutputStream strm) {return new BinaryOutputArchive(new DataOutputStream(strm));}/** Creates a new instance of BinaryOutputArchive */public BinaryOutputArchive(DataOutput out) {this.out = out;}public void writeByte(byte b, String tag) throws IOException {out.writeByte(b);}public void writeBool(boolean b, String tag) throws IOException {out.writeBoolean(b);}public void writeInt(int i, String tag) throws IOException {out.writeInt(i);}public void writeLong(long l, String tag) throws IOException {out.writeLong(l);}public void writeFloat(float f, String tag) throws IOException {out.writeFloat(f);}public void writeDouble(double d, String tag) throws IOException {out.writeDouble(d);}/*** create our own char encoder to utf8. This is faster * then string.getbytes(UTF8).* @param s the string to encode into utf8* @return utf8 byte sequence.*/final private ByteBuffer stringToByteBuffer(CharSequence s) {bb.clear();final int len = s.length();for (int i = 0; i < len; i++) {if (bb.remaining() < 3) {ByteBuffer n = ByteBuffer.allocate(bb.capacity() << 1);bb.flip();n.put(bb);bb = n;}char c = s.charAt(i);if (c < 0x80) {bb.put((byte) c);} else if (c < 0x800) {bb.put((byte) (0xc0 | (c >> 6)));bb.put((byte) (0x80 | (c & 0x3f)));} else {bb.put((byte) (0xe0 | (c >> 12)));bb.put((byte) (0x80 | ((c >> 6) & 0x3f)));bb.put((byte) (0x80 | (c & 0x3f)));}}bb.flip();return bb;}public void writeString(String s, String tag) throws IOException {if (s == null) {writeInt(-1, "len");return;}ByteBuffer bb = stringToByteBuffer(s);writeInt(bb.remaining(), "len");out.write(bb.array(), bb.position(), bb.limit());}public void writeBuffer(byte barr[], String tag)throws IOException {if (barr == null) {out.writeInt(-1);return;}out.writeInt(barr.length);out.write(barr);}public void writeRecord(Record r, String tag) throws IOException {r.serialize(this, tag);}public void startRecord(Record r, String tag) throws IOException {}public void endRecord(Record r, String tag) throws IOException {} }BinaryInputArchive 也是類似的,就不細講了。
通信協議
基于 TCP/IP 協議,Zookeeper 實現了自己的通信協議來玩按成客戶端與服務端、服務端與服務端之間的網絡通信,對于請求,主要包含請求頭和請求體,對于響應,主要包含響應頭和響應體。
對于請求協議而言,如下為獲取節點數據請求的完整協議定義:
圖里的 bit offset,其實應該是字節 byte offset,因為 4 個 bit 位肯定不能完全表達數據長度。其中 xid 用于記錄客戶端請求發起的先后序號,用來確保單個客戶端請求的響應順序,type 代表請求的操作類型,如創建節點(OpCode.create)、刪除節點(OpCode.delete)、獲取節點數據(OpCode.getData)。
協議的請求主體內容部分,包含了請求的所有操作內容,不同的請求類型請求體不同。對于會話創建而言,其請求體如下:
class ConnectRequest {int protocolVersion;long lastZxidSeen;int timeOut;long sessionId;buffer passwd;}Zookeeper 客戶端和服務器在創建會話時,會發送 ConnectRequest 請求,該請求包含協議版本號 protocolVersion、最近一次接收到服務器 ZXID lastZxidSeen、會話超時時間 timeOut、會話標識 sessionId 和會話密碼 passwd。
對于響應協議而言,如下為獲取節點數據響應的完整協議定義:
xid 與請求頭中的 xid 一致,zxid 表示 Zookeeper 服務器上當前最新的事務 ID,err 則是一個錯誤碼,表示當請求處理過程出現異常情況時,就會在錯誤碼中標識出來,常見的包括處理成功(Code.OK)、節點不存在(Code.NONODE)、沒有權限(Code.NOAUTH)。
協議的響應主體內容部分,包含了響應的所有數據,不同的響應類型請求體不同。對于會話創建而言,其響應體如下:
class ConnectResponse {int protocolVersion;int timeOut;long sessionId;buffer passwd;}針對客戶端的會話創建請求,服務端會返回客戶端一個 ConnectResponse 響應,該響應體包含了版本號 protocolVersion、會話的超時時間 timeOut、會話標識 sessionId 和會話密碼 passwd。
客戶端
客戶端是開發人員使用 ZooKeeper 最主要的途徑,主要由以下幾個核心組件構成:
一次會話的創建過程
初始化階段
會話創建階段
響應處理階段
服務器地址列表
在使用ZooKeeper構造方法時,用戶傳入的ZooKeeper服務器地址列表,即connectString參數,通常是這樣一個使用英文狀態逗號分隔的多個IP地址和端口的字符串:
192.168.0.1:2181,192.168.0.1:2181,192.168.0.1:2181在 3.2.0 之后的新版本客戶端里,也增加了命名空間特性。如果一個 ZooKeeper 客戶端配置了 Chroot,那么客戶端對服務器的任何操作,都將會被限制在自己的命名空間下。
192.168.0.1:2181,192.168.0.1:2181,192.168.0.1:2181/namespaceZooKeeper客戶端允許我們將服務器的所有地址都配置在一個字符串上,于是一個問題就來了:ZooKeeper客戶端在連接服務器的過程中,是如何從這個服務器列表中選擇服務器機器的呢?是按序訪問,還是隨機訪問呢?
在 ZooKeeper 中,是通過 HostProvider 來提供地址管理的,有個 next 方法用于獲取下一個要訪問的地址。其默認實現是 StaticHostProvider,next 方法的邏輯是先將地址隨機打亂,組成一個循環隊列,然后一直按序訪問。
public interface HostProvider {public int size();public InetSocketAddress next(long spinDelay);public void onConnected(); }默認實現的 StaticHostProvider 邏輯比較簡單,可以考慮對其進行擴展:
ClientCnxn:網絡 I/O
ClientCnxn 是 ZooKeeper 客戶端的核心工作類,負責維護客戶端和服務端之間的網絡連接,并進行一系列通信。ClientCnxn 有下面這些屬性:
可以看到隊列里元素類型是 Packet,Packet 是對協議層的一個封裝,看起來屬性很多,但實際上只有 requestHeader、request、readOnly 會傳遞到服務端。
static class Packet {RequestHeader requestHeader; // 請求頭ReplyHeader replyHeader; // 響應頭Record request; // 請求體Record response; // 響應體ByteBuffer bb;/** Client's view of the path (may differ due to chroot) **/String clientPath; // 節點路徑/** Servers's view of the path (may differ due to chroot) **/String serverPath;boolean finished;AsyncCallback cb;Object ctx;WatchRegistration watchRegistration; // 注冊的 watcherpublic boolean readOnly;}會話
會話是 ZooKeeper 中最重要的概念之一,客戶端與服務端之間的任何交互操作都與會話息息相關。
會話狀態包括 CONNECTING、CONNECTED、RECONNECTING、RECONNECTED、CLOSED 等。
Session是Zookeeper中的會話實體,代表了一個客戶端會話,其包含了如下四個屬性:
Zookeeper 為了保證請求會話的全局唯一性,在 SessionTracker 初始化時,調用 initializeNextSession 方法生成一個 sessionID,之后在 Zookeeper 運行過程中,會在該 sessionID 的基礎上為每個會話進行分配,初始化算法如下:
public static long initializeNextSession(long id) {long nextSid = 0;// 無符號右移8位使為了避免左移24后,再右移8位出現負數而無法通過高8位確定sid值nextSid = (System.currentTimeMillis() << 24) >>> 8;nextSid = nextSid | (id << 56);return nextSid; }其中的 id 表示配置在 myid 文件中的值,通常是一個整數,如1、2、3。該算法的高 8 位確定了所在機器,后 56 位使用當前時間的毫秒表示進行隨機。SessionTracker 是 Zookeeper 服務端的會話管理器,負責會話的創建、管理和清理等工作。
Zookeeper的會話管理主要是通過SessionTracker來負責,其采用了分桶策略(將類似的會話放在同一區塊中進行管理)進行管理,以便Zookeeper對會話進行不同區塊的隔離處理以及同一區塊的統一處理。
Zookeeper將所有的會話都分配在不同的區塊一種,分配的原則是每個會話的下次超時時間點(ExpirationTime)。ExpirationTime指該會話最近一次可能超時的時間點。同時,Zookeeper Leader服務器在運行過程中會定時地進行會話超時檢查,時間間隔是ExpirationInterval,默認為tickTime的值,ExpirationTime的計算時間如下
ExpirationTime = ((CurrentTime + SessionTimeOut) / ExpirationInterval + 1) * ExpirationInterval會了保持客戶端會話的有效性,客戶端會在會話超時時間過期范圍內向服務端發送PING請求來保持會話的有效性(心跳檢測)。同時,服務端需要不斷地接收來自客戶端的心跳檢測,并且需要重新激活對應的客戶端會話,這個重新激活過程稱為TouchSession。
當SessionTracker的會話超時線程檢查出已經過期的會話后,就開始進行會話清理工作,大致可以分為如下七步。
- 節點刪除請求,刪除的目標節點正好是上述臨時節點中的一個。
- 臨時節點創建請求,創建的目標節點正好是上述臨時節點中的一個。
- 對于第一類請求,需要將所有請求對應的數據節點路徑從當前臨時節點列表中移出,以避免重復刪除,對于第二類請求,需要將所有這些請求對應的數據節點路徑添加到當前臨時節點列表中,以刪除這些即將被創建但是尚未保存到內存數據庫中的臨時節點。
當客戶端與服務端之間的網絡連接斷開時,Zookeeper 客戶端會自動進行反復的重連,直到最終成功連接上 Zookeeper 集群中的一臺機器。
總結
以上是生活随笔為你收集整理的【笔记】从 Paxos 到 Zookeeper:第七章 Zookeeper 技术内幕之客户端的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一文了解如何使用移动应用安全组件Soot
- 下一篇: tif构建金字塔失败arcgis_Arc