日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式服务框架 Zookeeper — 管理分布式环境中的数据

發(fā)布時間:2025/7/25 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式服务框架 Zookeeper — 管理分布式环境中的数据 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

FROM: http://www.superwu.cn/2014/11/26/1461


本節(jié)本來是要介紹ZooKeeper的實現(xiàn)原理,但是ZooKeeper的原理比較復雜,它涉及到了paxos算法、Zab協(xié)議、通信協(xié)議等相關知識,理解起來比較抽象所以還需要借助一些應用場景,來幫我們理解。由于內(nèi)容比較多,一口氣吃不成胖子,得慢慢來一步一個腳印,因此我對后期ZooKeeper的學習規(guī)劃如下:

第一階段:

|---理解ZooKeeper的應用

????|---ZooKeeper是什么

????|---ZooKeeper能干什么

????|---ZooKeeper 怎么使用

第二階段:

|---理解ZooKeeper原理準備

????|---了解paxos

????|---理解 zab原理

????|---理解選舉/同步流程

第三階段:

????|---深入ZooKeeper原理

????????|---分析源碼

????????|---嘗試開發(fā)分布式應用

由于內(nèi)容較多,而且理解較為復雜,所以每個階段分開來學習和介紹,那么本文主要介紹的的是第一階段,該階段一般應該放在前面介紹,但感覺像一些ZooKeeper應用案例,如果沒有一定的ZooKeeper基礎,理解起來也比較抽象,
所以放在這介紹。大家可以對比一下前面的應用程序,來對比理解一下前面的那些應用到底用到ZooKeeper的那些功能,來進一步理解ZooKeeper的實現(xiàn)理念,由于網(wǎng)上關于這方面的介紹比較多,如果一些可愛的博友對該內(nèi)容已經(jīng)比較了解,那么您可以不用往下看了,繼續(xù)下一步學習。

一、ZooKeeper產(chǎn)生背景

1.1 分布式的發(fā)展

分布式這個概念我想大家并不陌生,但真正實戰(zhàn)開始還要從google說起,很早以前在實驗室中分布式被人提出,可是說是計算機內(nèi)入行較為復雜學習較為困難的技術,并且市場也并不成熟,因此大規(guī)模的商業(yè)應用一直未成出現(xiàn),但從Google 發(fā)布了MapReduce?DFS?以及Bigtable的論文之后,分布式在計算機界的格局就發(fā)生了變化,從架構上實現(xiàn)了分布式的難題,并且成熟的應用在了海量數(shù)據(jù)存儲計算上,其集群的規(guī)模也是當前世界上最為龐大的。

DFS 為基礎的分布式計算框架key、value?數(shù)據(jù)高效的解決運算的瓶頸,而且開發(fā)人員不用再寫復雜的分布式程序,只要底層框架完備開發(fā)人員只要用較少的代碼就可以完成分布式程序的開發(fā),這使得開發(fā)人員只需要關注業(yè)務邏輯的即可。Google 在業(yè)界技術上的領軍地位,讓業(yè)界望塵莫及的技術實力,IT 因此也是對Google 所退出的技術十分推崇。在最近幾年中分布式則是成為了海量數(shù)據(jù)存儲以及計算、高并發(fā)、高可靠性、高可用性的解決方案。

1.2 ZooKeeper的產(chǎn)生

眾所周知通常分布式架構都是中心化的設計,就是一個主控機連接多個處理節(jié)點。問題可以從這里考慮,當主控機失效時,整個系統(tǒng)則就無法訪問了,所以保證系統(tǒng)的高可用性是非常關鍵之處,也就是要保證主控機的高可用性。分布式鎖就是一個解決該問題的較好方案,多主控機搶一把鎖。在這里我們就涉及到了我們的重點Zookeeper。

ZooKeeper是什么,chubby 我想大家都不會陌生的,chubby 是實現(xiàn)Google 的一個分布式鎖的實現(xiàn),運用到了paxos 算法解決的一個分布式事務管理的系統(tǒng)。Zookeeper 就是雅虎模仿強大的Google chubby 實現(xiàn)的一套分布式鎖管理系統(tǒng)。同時,Zookeeper 分布式服務框架是Apache Hadoop的一個子項目,它是一個針對大型分布式系統(tǒng)的可靠協(xié)調(diào)系統(tǒng),它主要是用來解決分布式應用中經(jīng)常遇到的一些數(shù)據(jù)管理問題,可以高可靠的維護元數(shù)據(jù)。提供的功能包括:配置維護、名字服務、分布式同步、組服務等。ZooKeeper的設計目標就是封裝好復雜易出錯的關鍵服務,將簡單易用的接口和性能高效、功能穩(wěn)定的系統(tǒng)提供給用戶。

1.3 ZooKeeper的使用

Zookeeper 作為一個分布式的服務框架,主要用來解決分布式集群中應用系統(tǒng)的一致性問題,它能提供基于類似于文件系統(tǒng)的目錄節(jié)點樹方式的數(shù)據(jù)存儲但是 Zookeeper 并不是用來專門存儲數(shù)據(jù)的,它的作用主要是用來維護監(jiān)控你存儲的數(shù)據(jù)的狀態(tài)變化。通過監(jiān)控這些數(shù)據(jù)狀態(tài)的變化,從而可以達到基于數(shù)據(jù)的集群管理,后面將
會詳細介紹 Zookeeper 能夠解決的一些典型問題。

注意一下這里的"數(shù)據(jù)"是有限制的:

(1)?從數(shù)據(jù)大小來看:我們知道ZooKeeper的數(shù)據(jù)存儲在一個叫ReplicatedDataBase?的數(shù)據(jù)庫中,該數(shù)據(jù)是一個內(nèi)存數(shù)據(jù)庫,既然是在內(nèi)存當中,我就應該知道該數(shù)據(jù)量就應該不會太大,這一點上就與hadoopHDFS有了很大的區(qū)別,HDFS的數(shù)據(jù)主要存儲在磁盤上,因此數(shù)據(jù)存儲主要是HDFS的事,而ZooKeeper主要是協(xié)調(diào)功能,并不是用來存儲數(shù)據(jù)的。

(2) 從數(shù)據(jù)類型來看:正如前面所說的,ZooKeeper的數(shù)據(jù)在內(nèi)存中,由于內(nèi)存空間的限制,那么我們就不能在上面隨心所欲的存儲數(shù)據(jù),所以ZooKeeper存儲的數(shù)據(jù)都是我們所關心的數(shù)據(jù)而且數(shù)據(jù)量還不能太大,而且還會根據(jù)我們要以實現(xiàn)的功能來選擇相應的數(shù)據(jù)。簡單來說,干什么事存什么數(shù)據(jù),ZooKeeper所實現(xiàn)的一切功能,都是由ZK節(jié)點的性質(zhì)該節(jié)點所關聯(lián)的數(shù)據(jù)實現(xiàn)的,至于關聯(lián)什么數(shù)據(jù)那就要看你干什么事了。

例如:

 ?、?/span>集群管理:利用臨時節(jié)點特性,節(jié)點關聯(lián)的是機器的主機名、IP地址等相關信息,集群單點故障也屬于該范疇。 ?

? ? ②統(tǒng)一命名:主要利用節(jié)點的唯一性和目錄節(jié)點樹結(jié)構。

  ③配置管理:節(jié)點關聯(lián)的是配置信息。

  ④分布式鎖:節(jié)點關聯(lián)的是要競爭的資源。

二、ZooKeeper應用場景

ZooKeeper是一個高可用的分布式數(shù)據(jù)管理與系統(tǒng)協(xié)調(diào)框架。基于對Paxos算法的實現(xiàn),使該框架保證了分布式環(huán)境中數(shù)據(jù)的強一致性,也正是基于這樣的特性,使得zookeeper能夠應用于很多場景。需要注意的是,ZK并不是生來就為這些場景設計,都是后來眾多開發(fā)者根據(jù)框架的特性,摸索出來的典型使用方法。因此,我們也可以根據(jù)自己的需要來設計相應的場景實現(xiàn)。正如前文所提到的,ZooKeeper 實現(xiàn)的任何功能都離不開ZooKeeper的數(shù)據(jù)結(jié)構,任何功能的實現(xiàn)都是利用"Znode結(jié)構特性+節(jié)點關聯(lián)的數(shù)據(jù)"來實現(xiàn)的,好吧那么我們就看一下ZooKeeper數(shù)據(jù)結(jié)構有哪些特性。ZooKeeper數(shù)據(jù)結(jié)構如下圖所示:

圖2.1 ZooKeeper數(shù)據(jù)結(jié)構


Zookeeper 這種數(shù)據(jù)結(jié)構有如下這些特點:

?每個子目錄項如 NameService 都被稱作為 znode,這個 znode 是被它所在的路徑唯一標識,如 Server1 這個 znode 的標識為?/NameService/Server1

?znode 可以有子節(jié)點目錄,并且每個 znode 可以存儲數(shù)據(jù),注意?EPHEMERAL?類型的目錄節(jié)點不能有子節(jié)點目錄;

?znode 是有版本的,每個 znode 中存儲的數(shù)據(jù)可以有多個版本,也就是一個訪問路徑中可以存儲多份數(shù)據(jù)

?znode 可以是臨時節(jié)點,一旦創(chuàng)建這個 znode 的客戶端與服務器失去聯(lián)系,這個 znode 也將自動刪除,Zookeeper 的客戶端和服務器通信采用長連接方式,每個客戶端和服務器通過心跳來保持連接,這個連接狀態(tài)稱為 session,如果 znode 是臨時節(jié)點,這個 session 失效,znode 也就刪除了;

?znode 的目錄名可以自動編號,如 App1 已經(jīng)存在,再創(chuàng)建的話,將會自動命名為 App2

?znode 可以被監(jiān)控,包括這個目錄節(jié)點中存儲的數(shù)據(jù)的修改,子節(jié)點目錄的變化等,一旦變化可以通知設置監(jiān)控的客戶端,這個是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于這個特性實現(xiàn)的。

2.1數(shù)據(jù)發(fā)布與訂閱

(1) 典型場景描述

發(fā)布與訂閱即所謂的配置管理,顧名思義就是將數(shù)據(jù)發(fā)布到ZK節(jié)點上,供訂閱者動態(tài)獲取數(shù)據(jù),實現(xiàn)配置信息的集中式管理和動態(tài)更新。例如全局的配置信息地址列表等就非常適合使用。集中式的配置管理在應用集群中是非常常見的,一般商業(yè)公司內(nèi)部都會實現(xiàn)一套集中的配置管理中心,應對不同的應用集群對于共享各自配置的需求,并且在配置變更時能夠通知到集群中的每一個機器。

(2) 應用

?索引信息和集群中機器節(jié)點狀態(tài)存放在ZK的一些指定節(jié)點,供各個客戶端訂閱使用。

?系統(tǒng)日志(經(jīng)過處理后的)存儲,這些日志通常2-3天后被清除。

?應用中用到的一些配置信息集中管理,在應用啟動的時候主動來獲取一次,并且在節(jié)點上注冊一個Watcher,以后每次配置有更新,實時通知到應用,獲取最新配置信息。

?業(yè)務邏輯中需要用到的一些全局變量,比如一些消息中間件的消息隊列通常有個offset,這個offset存放在zk上,這樣集群中每個發(fā)送者都能知道當前的發(fā)送進度

?系統(tǒng)中有些信息需要動態(tài)獲取,并且還會存在人工手動去修改這個信息。以前通常是暴露出接口,例如JMX接口,有了ZK后,只要將這些信息存放到ZK節(jié)點上即可。

(3) 應用舉例

例如:同一個應用系統(tǒng)需要多臺 PC Server 運行,但是它們運行的應用系統(tǒng)的某些配置項是相同的,如果要修改這些相同的配置項,那么就必須同時修改每臺運行這個應用系統(tǒng)的 PC Server,這樣非常麻煩而且容易出錯。將配置信息保存在 Zookeeper 的某個目錄節(jié)點中,然后將所有需要修改的應用機器監(jiān)控配置信息的狀態(tài),一旦配置信息發(fā)生變化,每臺應用機器就會收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應用到系統(tǒng)中。ZooKeeper配置管理服務如下圖所示:

圖2.2 配置管理結(jié)構圖


Zookeeper很容易實現(xiàn)這種集中式的配置管理,比如將所需要的配置信息放到/Configuration?節(jié)點上,集群中所有機器一啟動就會通過Client/Configuration這個節(jié)點進行監(jiān)控【zk.exist("/Configuration″,true)】,并且實現(xiàn)Watcher回調(diào)方法process(),那么在zookeeper/Configuration節(jié)點下數(shù)據(jù)發(fā)生變化的時候,每個機器都會收到通知,Watcher回調(diào)方法將會被執(zhí)行,那么應用再取下數(shù)據(jù)即可【zk.getData("/Configuration″,false,null)】。

2.2統(tǒng)一命名服務(Name Service

(1) 場景描述

分布式應用中,通常需要有一套完整的命名規(guī)則,既能夠產(chǎn)生唯一的名稱又便于人識別和記住,通常情況下用樹形的名稱結(jié)構是一個理想的選擇,樹形的名稱結(jié)構是一個有層次的目錄結(jié)構,既對人友好又不會重復。說到這里你可能想到了 JNDI,沒錯 Zookeeper Name Service JNDI 能夠完成的功能是差不多的,它們都是將有層次的目錄結(jié)構關聯(lián)到一定資源上,但是ZookeeperName Service 更加是廣泛意義上的關聯(lián),也許你并不需要將名稱關聯(lián)到特定資源上,你可能只需要一個不會重復名稱,就像數(shù)據(jù)庫中產(chǎn)生一個唯一的數(shù)字主鍵一樣。

(2) 應用

在分布式系統(tǒng)中,通過使用命名服務,客戶端應用能夠根據(jù)指定的名字來獲取資源服務的地址提供者等信息。被命名的實體通??梢允羌褐械?span style="color:blue">機器,提供的服務地址進程對象等等,這些我們都可以統(tǒng)稱他們?yōu)槊?#xff08;Name)。其中較為常見的就是一些分布式服務框架中的服務地址列表。通過調(diào)用ZK提供的創(chuàng)建節(jié)點的API,能夠很容易創(chuàng)建一個全局唯一的path,這個path就可以作為一個名稱。Name Service 已經(jīng)是Zookeeper 內(nèi)置的功能,你只要調(diào)用 Zookeeper API 就能實現(xiàn)。如調(diào)用 create 接口就可以很容易創(chuàng)建一個目錄節(jié)點。

(3) 應用舉例

阿里開源的分布式服務框架Dubbo中使用ZooKeeper來作為其命名服務,維護全局的服務地址列表。在Dubbo實現(xiàn)中:?服務提供者在啟動的時候,向ZK上的指定節(jié)點/dubbo/${serviceName}/providers目錄下寫入自己的URL地址,這個操作就完成了服務的發(fā)布。?服務消費者啟動的時候,訂閱/dubbo/serviceName/providers目錄下的提供者URL地址,并向/dubbo/{serviceName} /consumers目錄下寫入自己的URL地址。
注意,所有向ZK上注冊的地址都是臨時節(jié)點,這樣就能夠保證服務提供者和消費者能夠自動感應資源的變化。
另外,Dubbo還有針對服務粒度的監(jiān)控,方法是訂閱/dubbo/${serviceName}目錄下所有提供者和消費者的信息。

2.3分布通知/協(xié)調(diào)(Distribution of notification/coordination

(1) 典型場景描述

ZooKeeper中特有watcher注冊與異步通知機制,能夠很好的實現(xiàn)分布式環(huán)境下不同系統(tǒng)之間的通知與協(xié)調(diào),實現(xiàn)對數(shù)據(jù)變更的實時處理。使用方法通常是不同系統(tǒng)都對ZK上同一個znode進行注冊,監(jiān)聽znode的變化(包括znode本身內(nèi)容及子節(jié)點的),其中一個系統(tǒng)updateznode,那么另一個系統(tǒng)能夠收到通知,并作出相應處理。

(2) 應用

?另一種心跳檢測機制檢測系統(tǒng)被檢測系統(tǒng)之間并不直接關聯(lián)起來,而是通過ZK上某個節(jié)點關聯(lián),大大減少系統(tǒng)耦合。

?另一種系統(tǒng)調(diào)度模式:某系統(tǒng)由控制臺推送系統(tǒng)兩部分組成,控制臺的職責是控制推送系統(tǒng)進行相應的推送工作。管理人員在控制臺作的一些操作,實際上是修改了ZK上某些節(jié)點的狀態(tài),而ZK就把這些變化通知給他們注冊Watcher的客戶端,即推送系統(tǒng),于是,作出相應的推送任務。

?另一種工作匯報模式:一些類似于任務分發(fā)系統(tǒng)子任務啟動后,到ZK來注冊一個臨時節(jié)點,并且定時將自己的進度進行匯報(將進度寫回這個臨時節(jié)點),這樣任務管理者就能夠?qū)崟r知道任務進度。

總之,使用zookeeper來進行分布式通知和協(xié)調(diào)能夠大大降低系統(tǒng)之間的耦合。

2.4分布式鎖(Distribute Lock

(1) 場景描述

分布式鎖,這個主要得益于ZooKeeper為我們保證了數(shù)據(jù)的強一致性,即用戶只要完全相信每時每刻,zk集群中任意節(jié)點(一個zk server)上的相同znode的數(shù)據(jù)是一定是相同的。鎖服務可以分為兩類,一個是保持獨占,另一個是控制時序。

保持獨占,就是所有試圖來獲取這個鎖的客戶端,最終只有一個可以成功獲得這把鎖。通常的做法是把ZK上的一個znode看作是一把鎖,通過create znode的方式來實現(xiàn)。所有客戶端都去創(chuàng)建 /distribute_lock 節(jié)點,最終成功創(chuàng)建的那個客戶端也即擁有了這把鎖。

控制時序,就是所有試圖來獲取這個鎖的客戶端,最終都是會被安排執(zhí)行,只是有個全局時序了。做法和上面基本類似,只是這里 /distribute_lock 已經(jīng)預先存在,客戶端在它下面創(chuàng)建臨時有序節(jié)點。Zk的父節(jié)點(/distribute_lock)維持一份sequence,保證子節(jié)點創(chuàng)建的時序性,從而也形成了每個客戶端的全局時序。

(2) 應用

共享鎖在同一個進程中很容易實現(xiàn),但是在跨進程或者在不同 Server 之間就不好實現(xiàn)了。Zookeeper 卻很容易實現(xiàn)這個功能,實現(xiàn)方式也是需要獲得鎖的 Server 創(chuàng)建一個?EPHEMERAL_SEQUENTIAL?目錄節(jié)點,然后調(diào)用?getChildren方法獲取當前的目錄節(jié)點列表中最小的目錄節(jié)點是不是就是自己創(chuàng)建的目錄節(jié)點,如果正是自己創(chuàng)建的,那么它就獲得了這個鎖,如果不是那么它就調(diào)用?exists(String path, boolean watch) 方法并監(jiān)控 Zookeeper 上目錄節(jié)點列表的變化,一直到自己創(chuàng)建的節(jié)點是列表中最小編號的目錄節(jié)點,從而獲得鎖,釋放鎖很簡單,只要刪除前面它自己所創(chuàng)建的目錄節(jié)點就行了。

圖 2.3 ZooKeeper實現(xiàn)Locks的流程圖


代碼清單1 TestMainClient 代碼

  • package org.zk.leader.election;

  • import?org.apache.log4j.xml.DOMConfigurator;
  • import?org.apache.zookeeper.WatchedEvent;
  • import?org.apache.zookeeper.Watcher;
  • import?org.apache.zookeeper.ZooKeeper;

  • import?java.io.IOException;

  • /**
  • ?* TestMainClient
  • ?* <p/>
  • ?* Author By: sunddenly工作室
  • ?* Created Date: 2014-11-13
  • ?*/
  • public?class?TestMainClient implements Watcher {
  • ????protected?static?ZooKeeper zk =?null;
  • ????protected?static?Integer mutex;
  • ????int?sessionTimeout = 10000;
  • ????protected?String root;
  • ????public?TestMainClient(String connectString) {
  • ????????if(zk ==?null){
  • ????????????try?{

  • ????????????????String configFile =?this.getClass().getResource("/").getPath()+"org/zk/leader/election/log4j.xml";
  • ????????????????DOMConfigurator.configure(configFile);
  • ????????????????System.out.println("創(chuàng)建一個新的連接:");
  • ????????????????zk =?new?ZooKeeper(connectString, sessionTimeout,?this);
  • ????????????????mutex =?new?Integer(-1);
  • ????????????}?catch?(IOException e) {
  • ????????????????zk =?null;
  • ????????????}
  • ????????}
  • ????}
  • ???synchronized?public?void?process(WatchedEvent?event) {
  • ????????synchronized (mutex) {
  • ????????????mutex.notify();
  • ????????}
  • ????}
  • }
  • 清單 2 Locks 代碼

  • package?org.zk.locks;

  • import?org.apache.log4j.Logger;
  • import?org.apache.zookeeper.CreateMode;
  • import?org.apache.zookeeper.KeeperException;
  • import?org.apache.zookeeper.WatchedEvent;
  • import?org.apache.zookeeper.ZooDefs;
  • import?org.apache.zookeeper.data.Stat;
  • import?org.zk.leader.election.TestMainClient;

  • import?java.util.Arrays;
  • import?java.util.List;

  • /**
  • ?* locks
  • ?* <p/>
  • ?* Author By: sunddenly工作室
  • ?* Created Date: 2014-11-13 16:49:40
  • ?*/
  • public?class?Locks?extends?TestMainClient {
  • ????public?static?final?Logger logger = Logger.getLogger(Locks.class);
  • ????String myZnode;

  • ????public?Locks(String connectString, String root) {
  • ????????super(connectString);
  • ????????this.root = root;
  • ????????if?(zk !=?null) {
  • ????????????try?{
  • ????????????????Stat s = zk.exists(root,?false);
  • ????????????????if?(s ==?null) {
  • ????????????????????zk.create(root,?new?byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  • ????????????????}
  • ????????????}?catch?(KeeperException e) {
  • ????????????????logger.error(e);
  • ????????????}?catch?(InterruptedException e) {
  • ????????????????logger.error(e);
  • ????????????}
  • ????????}
  • ????}
  • ????void?getLock()?throws?KeeperException, InterruptedException{
  • ????????List<String> list = zk.getChildren(root,?false);
  • ????????String[] nodes = list.toArray(new?String[list.size()]);
  • ????????Arrays.sort(nodes);
  • ????????if(myZnode.equals(root+"/"+nodes[0])){
  • ????????????doAction();
  • ????????}
  • ????????else{
  • ????????????waitForLock(nodes[0]);
  • ????????}
  • ????}
  • ????void?check()?throws?InterruptedException, KeeperException {
  • ????????myZnode = zk.create(root + "/lock_" ,?new?byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
  • ????????getLock();
  • ????}
  • ????void?waitForLock(String lower)?throws?InterruptedException, KeeperException {
  • ????????Stat stat = zk.exists(root + "/" + lower,true);
  • ????????if(stat !=?null){
  • ????????????mutex.wait();
  • ????????}
  • ????????else{
  • ????????????getLock();
  • ????????}
  • ????}
  • ????@Override
  • ????public?void?process(WatchedEvent event) {
  • ????????if(event.getType() == Event.EventType.NodeDeleted){
  • ????????????System.out.println("得到通知");
  • ????????????super.process(event);
  • ????????????doAction();
  • ????????}
  • ????}
  • ????/**
  • ?????*?執(zhí)行其他任務
  • ?????*/
  • ????private?void?doAction(){
  • ????????System.out.println("同步隊列已經(jīng)得到同步,可以開始執(zhí)行后面的任務了");
  • ????}

  • ????public?static?void?main(String[] args) {
  • ????????String connectString = "localhost:2181";

  • ????????Locks lk =?new?Locks(connectString, "/locks");
  • ????????try?{
  • ????????????lk.check();
  • ????????}?catch?(InterruptedException e) {
  • ????????????logger.error(e);
  • ????????}?catch?(KeeperException e) {
  • ????????????logger.error(e);
  • ????????}
  • ????}
  • }
  • 2.5 集群管理(Cluster Management

    (1) 典型場景描述

    集群機器監(jiān)控

    這通常用于那種對集群中機器狀態(tài),機器在線率有較高要求的場景,能夠快速對集群中機器變化作出響應。這樣的場景中,往往有一個監(jiān)控系統(tǒng),實時檢測集群機器是否存活。過去的做法通常是:監(jiān)控系統(tǒng)通過某種手段(比如ping)定時檢測每個機器,或者每個機器自己定時向監(jiān)控系統(tǒng)匯報"我還活著"。
    這種做法可行,但是存在兩個比較明顯的問題:

    ?集群中機器有變動的時候,牽連修改的東西比較多。

    ?有一定的延時。

    利用ZooKeeper中兩個特性,就可以實施另一種集群機器存活性監(jiān)控系統(tǒng):

    ?客戶端在節(jié)點 x 上注冊一個Watcher,那么如果 x 的子節(jié)點變化了,會通知該客戶端。

    ?創(chuàng)建EPHEMERAL類型的節(jié)點,一旦客戶端和服務器的會話結(jié)束或過期,那么該節(jié)點就會消失。

    Master選舉:

    Master選舉則是zookeeper中最為經(jīng)典的使用場景了,在分布式環(huán)境中,相同的業(yè)務應用分布在不同的機器上,有些業(yè)務邏輯,例如一些耗時的計算,網(wǎng)絡I/O處,往往只需要讓整個集群中的某一臺機器進行執(zhí)行,其余機器可以共享這個結(jié)果,這樣可以大大減少重復勞動,提高性能,于是這個master選舉便是這種場景下的碰到的主要問題。

    利用ZooKeeper中兩個特性,就可以實施另一種集群中Master選舉:

    ?利用ZooKeeper的強一致性,能夠保證在分布式高并發(fā)情況下節(jié)點創(chuàng)建的全局唯一性,即:同時有多個客戶端請求創(chuàng)建 /Master 節(jié)點,最終一定只有一個客戶端請求能夠創(chuàng)建成功。利用這個特性,就能很輕易的在分布式環(huán)境中進行集群選舉了。

    另外,這種場景演化一下,就是動態(tài)Master選舉。這就要用到?EPHEMERAL_SEQUENTIAL類型節(jié)點的特性了,這樣每個節(jié)點會自動被編號。允許所有請求都能夠創(chuàng)建成功,但是得有個創(chuàng)建順序,每次選取序列號最小的那個機器作為Master

    (2) 應用

    在搜索系統(tǒng)中,如果集群中每個機器都生成一份全量索引,不僅耗時,而且不能保證彼此間索引數(shù)據(jù)一致。因此讓集群中的Master來迚行全量索引的生成,然后同步到集群中其它機器。另外,Master選丼的容災措施是,可以隨時迚行手動挃定master,就是說應用在zk在無法獲取master信息時,可以通過比如http方式,向一個地方獲取master
    l
    Hbase中,也是使用ZooKeeper來實現(xiàn)動態(tài)HMaster的選舉。在Hbase實現(xiàn)中,會在ZK上存儲一些ROOT表的地址和HMaster的地址,HRegionServer也會把自己以臨時節(jié)點(Ephemeral)的方式注冊到Zookeeper中,使得HMaster可以隨時感知到各個HRegionServer的存活狀態(tài),同時,一旦HMaster出現(xiàn)問題,會重新選丼出一個HMaster來運行,從而避免了HMaster的單點問題的存活狀態(tài),同時,一旦HMaster出現(xiàn)問題,會重新選丼出一個HMaster來運行,從而避免了HMaster的單點問題。

    (3) 應用舉例

    集群監(jiān)控:

    應用集群中,我們常常需要讓每一個機器知道集群中或依賴的其他某一個集群中哪些機器是活著的,并且在集群機器因為宕機,網(wǎng)絡斷鏈等原因能夠不在人工介入的情況下迅速通知到每一個機器,Zookeeper 能夠很容易的實現(xiàn)集群管理的功能,如有多臺 Server 組成一個服務集群,那么必須要一個"總管"知道當前集群中每臺機器的服務狀態(tài),一旦有機器不能提供服務,集群中其它集群必須知道,從而做出調(diào)整重新分配服務策略。同樣當增加集群的服務能力時,就會增加一臺或多臺 Server,同樣也必須讓"總管"知道,這就是ZooKeeper的集群監(jiān)控功能。

    圖2.4 集群管理結(jié)構圖


    比如我在zookeeper服務器端有一個znode/Configuration,那么集群中每一個機器啟動的時候都去這個節(jié)點下創(chuàng)建一個EPHEMERAL類型的節(jié)點,比如server1創(chuàng)建/Configuration?/Server1server2創(chuàng)建/Configuration?/Server1,然后Server1Server2watch /Configuration 這個父節(jié)點,那么也就是這個父節(jié)點下數(shù)據(jù)或者子節(jié)點變化都會通知對該節(jié)點進行watch的客戶端。因為EPHEMERAL類型節(jié)點有一個很重要的特性,就是客戶端和服務器端連接斷掉或者session過期就會使節(jié)點消失,那么在某一個機器掛掉或者斷鏈的時候,其對應的節(jié)點就會消
    失,然后集群中所有對/Configuration進行watch的客戶端都會收到通知,然后取得最新列表即可。

    Master選舉:

    Zookeeper 不僅能夠維護當前的集群中機器的服務狀態(tài),而且能夠選出一個"總管",讓這個總管來管理集群,這就是 Zookeeper 的另一個功能 Leader Election。Zookeeper 如何實現(xiàn) Leader Election,也就是選出一個 Master Server。和前面的一樣每臺 Server 創(chuàng)建一個 EPHEMERAL 目錄節(jié)點,不同的是它還是一個 SEQUENTIAL 目錄節(jié)點,所以它是個 EPHEMERAL_SEQUENTIAL 目錄節(jié)點。之所以它是 EPHEMERAL_SEQUENTIAL 目錄節(jié)點,是因為我們可以給每臺 Server 編號,我們可以選擇當前是最小編號的 Server Master,假如這個最小編號的 Server 死去,由于是 EPHEMERAL 節(jié)點,死去的 Server 對應的節(jié)點也被刪除,所以當前的節(jié)點列表中又出現(xiàn)一個最小編號的節(jié)點,我們就選擇這個節(jié)點為當前 Master。這樣就實現(xiàn)了動態(tài)選擇 Master,避免了傳統(tǒng)意義上單 Master 容易出現(xiàn)單點故障的問題。

    清單 3 Leader Election代碼

  • package?org.zk.leader.election;

  • import?org.apache.log4j.Logger;
  • import?org.apache.zookeeper.CreateMode;
  • import?org.apache.zookeeper.KeeperException;
  • import?org.apache.zookeeper.WatchedEvent;
  • import?org.apache.zookeeper.ZooDefs;
  • import?org.apache.zookeeper.data.Stat;

  • import?java.net.InetAddress;
  • import?java.net.UnknownHostException;

  • /**
  • ?* LeaderElection
  • ?* <p/>
  • ?* Author By: sunddenly工作室
  • ?* Created Date: 2014-11-13
  • ?*/
  • public?class?LeaderElection?extends?TestMainClient {
  • ????public?static?final?Logger logger = Logger.getLogger(LeaderElection.class);

  • ????public?LeaderElection(String connectString, String root) {
  • ????????super(connectString);
  • ????????this.root = root;
  • ????????if?(zk !=?null) {
  • ????????????try?{
  • ????????????????Stat s = zk.exists(root,?false);
  • ????????????????if?(s ==?null) {
  • ????????????????????zk.create(root,?new?byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  • ????????????????}
  • ????????????}?catch?(KeeperException e) {
  • ????????????????logger.error(e);
  • ????????????}?catch?(InterruptedException e) {
  • ????????????????logger.error(e);
  • ????????????}
  • ????????}
  • ????}

  • ????void?findLeader()?throws?InterruptedException, UnknownHostException, KeeperException {
  • ????????byte[] leader =?null;
  • ????????try?{
  • ????????????leader = zk.getData(root + "/leader",?true,?null);
  • ????????}?catch?(KeeperException e) {
  • ????????????if?(e?instanceof?KeeperException.NoNodeException) {
  • ????????????????logger.error(e);
  • ????????????}?else?{
  • ????????????????throw?e;
  • ????????????}
  • ????????}
  • ????????if?(leader !=?null) {
  • ????????????following();
  • ????????}?else?{
  • ????????????String newLeader =?null;
  • ????????????byte[] localhost = InetAddress.getLocalHost().getAddress();
  • ????????????try?{
  • ????????????????newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  • ????????????}?catch?(KeeperException e) {
  • ????????????????if?(e?instanceof?KeeperException.NodeExistsException) {
  • ????????????????????logger.error(e);
  • ????????????????}?else?{
  • ????????????????????throw?e;
  • ????????????????}
  • ????????????}
  • ????????????if?(newLeader !=?null) {
  • ????????????????leading();
  • ????????????}?else?{
  • ????????????????mutex.wait();
  • ????????????}
  • ????????}
  • ????}

  • ????@Override
  • ????public?void?process(WatchedEvent event) {
  • ????????if?(event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) {
  • ????????????System.out.println("得到通知");
  • ????????????super.process(event);
  • ????????????following();
  • ????????}
  • ????}

  • ????void?leading() {
  • ????????System.out.println("成為領導者");
  • ????}

  • ????void?following() {
  • ????????System.out.println("成為組成員");
  • ????}

  • ????public?static?void?main(String[] args) {
  • ????????String connectString = "localhost:2181";

  • ????????LeaderElection le =?new?LeaderElection(connectString, "/GroupMembers");
  • ????????try?{
  • ????????????le.findLeader();
  • ????????}?catch?(Exception e) {
  • ????????????logger.error(e);
  • ????????}
  • ????}
  • }
  • 2.6 隊列管理

    Zookeeper 可以處理兩種類型的隊列:

    ?當一個隊列的成員都聚齊時,這個隊列才可用,否則一直等待所有成員到達,這種是同步隊列。

    ?隊列按照 FIFO 方式進行入隊和出隊操作,例如實現(xiàn)生產(chǎn)者消費者模型。

    (1)?同步隊列用 Zookeeper 實現(xiàn)的實現(xiàn)思路如下:

    創(chuàng)建一個父目錄 /synchronizing,每個成員都監(jiān)控標志(Set Watch)位目錄 /synchronizing/start 是否存在,然后每個成員都加入這個隊列,加入隊列的方式就是創(chuàng)建 /synchronizing/member_i 的臨時目錄節(jié)點,然后每個成員獲取 / synchronizing 目錄的所有目錄節(jié)點,也就是 member_i。判斷 i 的值是否已經(jīng)是成員的個數(shù),如果小于成員個數(shù)等待 /synchronizing/start 的出現(xiàn),如果已經(jīng)相等就創(chuàng)建 /synchronizing/start。

    用下面的流程圖更容易理解:

    圖 2.5 同步隊列流程圖


    清單 4 Synchronizing 代碼

  • package?org.zk.queue;

  • import?java.net.InetAddress;
  • import?java.net.UnknownHostException;
  • import?java.util.List;

  • import?org.apache.log4j.Logger;
  • import?org.apache.zookeeper.CreateMode;
  • import?org.apache.zookeeper.KeeperException;
  • import?org.apache.zookeeper.WatchedEvent;
  • import?org.apache.zookeeper.Watcher;
  • import?org.apache.zookeeper.ZooKeeper;
  • import?org.apache.zookeeper.ZooDefs.Ids;
  • import?org.apache.zookeeper.data.Stat;
  • import?org.zk.leader.election.TestMainClient;

  • /**
  • ?* Synchronizing
  • ?* <p/>
  • ?* Author By: sunddenly工作室
  • ?* Created Date: 2014-11-13
  • ?*/
  • public?class?Synchronizing?extends?TestMainClient {
  • ????int?size;
  • ????String name;
  • ????public?static?final?Logger logger = Logger.getLogger(Synchronizing.class);

  • ????/**
  • ?????*?構造函數(shù)
  • ?????*
  • ?????* @param connectString?服務器連接
  • ?????* @param root?根目錄
  • ?????* @param size?隊列大小
  • ?????*/
  • ????Synchronizing(String connectString, String root,?int?size) {
  • ????????super(connectString);
  • ????????this.root = root;
  • ????????this.size = size;

  • ????????if?(zk !=?null) {
  • ????????????try?{
  • ????????????????Stat s = zk.exists(root,?false);
  • ????????????????if?(s ==?null) {
  • ????????????????????zk.create(root,?new?byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  • ????????????????}
  • ????????????}?catch?(KeeperException e) {
  • ????????????????logger.error(e);
  • ????????????}?catch?(InterruptedException e) {
  • ????????????????logger.error(e);
  • ????????????}
  • ????????}
  • ????????try?{
  • ????????????name =?new?String(InetAddress.getLocalHost().getCanonicalHostName().toString());
  • ????????}?catch?(UnknownHostException e) {
  • ????????????logger.error(e);
  • ????????}

  • ????}

  • ????/**
  • ?????*?加入隊列
  • ?????*
  • ?????* @return
  • ?????* @throws KeeperException
  • ?????* @throws InterruptedException
  • ?????*/

  • ????void?addQueue()?throws?KeeperException, InterruptedException{
  • ????????zk.exists(root + "/start",true);
  • ????????zk.create(root + "/" + name,?new?byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
  • ????????synchronized?(mutex) {
  • ????????????List<String> list = zk.getChildren(root,?false);
  • ????????????if?(list.size() < size) {
  • ????????????????mutex.wait();
  • ????????????}?else?{
  • ????????????????zk.create(root + "/start",?new?byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  • ????????????}
  • ????????}
  • ????}

  • ????@Override
  • ????public?void?process(WatchedEvent event) {
  • ????????if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){
  • ????????????System.out.println("得到通知");
  • ????????????super.process(event);
  • ????????????doAction();
  • ????????}
  • ????}

  • ????/**
  • ?????*?執(zhí)行其他任務
  • ?????*/
  • ????private?void?doAction(){
  • ????????System.out.println("同步隊列已經(jīng)得到同步,可以開始執(zhí)行后面的任務了");
  • ????}

  • ????public?static?void?main(String args[]) {
  • ????????//啟動Server
  • ????????String connectString = "localhost:2181";
  • ????????int?size = 1;
  • ????????Synchronizing b =?new?Synchronizing(connectString, "/synchronizing", size);
  • ????????try{
  • ????????????b.addQueue();
  • ????????}?catch?(KeeperException e){
  • ????????????logger.error(e);
  • ????????}?catch?(InterruptedException e){
  • ????????????logger.error(e);
  • ????????}
  • ????}
  • }
  • (2)?FIFO 隊列用 Zookeeper 實現(xiàn)思路如下:

    實現(xiàn)的思路也非常簡單,就是在特定的目錄下創(chuàng)建 SEQUENTIAL 類型的子目錄 /queue_i,這樣就能保證所有成員加入隊列時都是有編號的,出隊列時通過 getChildren( ) 方法可以返回當前所有的隊列中的元素,然后消費其中最小的一個,這樣就能保證 FIFO

    下面是生產(chǎn)者和消費者這種隊列形式的示例代碼

    清單 5 FIFOQueue 代碼

  • import?org.apache.log4j.Logger;
  • import?org.apache.zookeeper.CreateMode;
  • import?org.apache.zookeeper.KeeperException;
  • import?org.apache.zookeeper.WatchedEvent;
  • import?org.apache.zookeeper.ZooDefs;
  • import?org.apache.zookeeper.data.Stat;

  • import?java.nio.ByteBuffer;
  • import?java.util.List;

  • /**
  • ?* FIFOQueue
  • ?* <p/>
  • ?* Author By: sunddenly工作室
  • ?* Created Date: 2014-11-13
  • ?*/
  • public?class?FIFOQueue?extends?TestMainClient{
  • ????public?static?final?Logger logger = Logger.getLogger(FIFOQueue.class);

  • ????/**
  • ?????* Constructor
  • ?????*
  • ?????* @param connectString
  • ?????* @param root
  • ?????*/
  • ????FIFOQueue(String connectString, String root) {
  • ????????super(connectString);
  • ????????this.root = root;
  • ????????if?(zk !=?null) {
  • ????????????try?{
  • ????????????????Stat s = zk.exists(root,?false);
  • ????????????????if?(s ==?null) {
  • ????????????????????zk.create(root,?new?byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  • ????????????????}
  • ????????????}?catch?(KeeperException e) {
  • ????????????????logger.error(e);
  • ????????????}?catch?(InterruptedException e) {
  • ????????????????logger.error(e);
  • ????????????}
  • ????????}
  • ????}
  • ????/**
  • ?????*?生產(chǎn)者
  • ?????*
  • ?????* @param i
  • ?????* @return
  • ?????*/

  • ????boolean?produce(int?i)?throws?KeeperException, InterruptedException{
  • ????????ByteBuffer b = ByteBuffer.allocate(4);
  • ????????byte[] value;
  • ????????b.putInt(i);
  • ????????value = b.array();
  • ????????zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  • ????????????????????CreateMode.PERSISTENT_SEQUENTIAL);
  • ????????return?true;
  • ????}


  • ????/**
  • ?????*?消費者
  • ?????*
  • ?????* @return
  • ?????* @throws KeeperException
  • ?????* @throws InterruptedException
  • ?????*/
  • ????int?consume()?throws?KeeperException, InterruptedException{
  • ????????int?retvalue = -1;
  • ????????Stat stat =?null;
  • ????????while?(true) {
  • ????????????synchronized?(mutex) {
  • ????????????????List<String> list = zk.getChildren(root,?true);
  • ????????????????if?(list.size() == 0) {
  • ????????????????????mutex.wait();
  • ????????????????}?else?{
  • ????????????????????Integer min =?new?Integer(list.get(0).substring(7));
  • ????????????????????for(String s : list){
  • ????????????????????????Integer tempValue =?new?Integer(s.substring(7));
  • ????????????????????????if(tempValue < min) min = tempValue;
  • ????????????????????}
  • ????????????????????byte[] b = zk.getData(root + "/element" + min,false, stat);
  • ????????????????????zk.delete(root + "/element" + min, 0);
  • ????????????????????ByteBuffer buffer = ByteBuffer.wrap(b);
  • ????????????????????retvalue = buffer.getInt();
  • ????????????????????return?retvalue;
  • ????????????????}
  • ????????????}
  • ????????}
  • ????}

  • ????@Override
  • ????public?void?process(WatchedEvent event) {
  • ????????super.process(event);
  • ????}

  • ????public?static?void?main(String args[]) {
  • ????????//啟動Server
  • ????????TestMainServer.start();
  • ????????String connectString = "localhost:"+TestMainServer.CLIENT_PORT;

  • ????????FIFOQueue q =?new?FIFOQueue(connectString, "/app1");
  • ????????int?i;
  • ????????Integer max =?new?Integer(5);

  • ????????System.out.println("Producer");
  • ????????for?(i = 0; i < max; i++)
  • ????????????try{
  • ????????????????q.produce(10 + i);
  • ????????????}?catch?(KeeperException e){
  • ????????????????logger.error(e);
  • ????????????}?catch?(InterruptedException e){
  • ????????????????logger.error(e);
  • ????????????}

  • ????????for?(i = 0; i < max; i++) {
  • ????????????try{
  • ????????????????int?r = q.consume();
  • ????????????????System.out.println("Item:?" + r);
  • ????????????}?catch?(KeeperException e){
  • ????????????????i--;
  • ????????????????logger.error(e);
  • ????????????}?catch?(InterruptedException e){
  • ????????????????logger.error(e);
  • ????????????}
  • ????????}

  • ????}
  • }
  • 三、ZooKeeper實際應用

    假設我們的集群有:

    (1)?20個搜索引擎的服務器:每個負責總索引中的一部分的搜索任務。

    ?搜索引擎的服務器中的15服務器現(xiàn)在提供搜索服務

    ?5服務器正在生成索引

    20個搜索引擎的服務器,經(jīng)常要讓正在提供搜索服務的服務器停止提供服務開始生成索引,生成索引的服務器已經(jīng)把索引生成完成可以搜索提供服務了。

    (2)?一個總服務器:負責向這20個搜索引擎的服務器發(fā)出搜索請求并合并結(jié)果集。

    (3)?一個備用的總服務器:負責當總服務器宕機時替換總服務器。

    (4)?一個webcgi:向總服務器發(fā)出搜索請求。

    使用Zookeeper可以保證:

    (1)?總服務器:自動感知有多少提供搜索引擎的服務器,并向這些服務器發(fā)出搜索請求。

    (2)?備用的總服務器:宕機時自動啟用備用的總服務器。

    (3)?webcgi能夠自動地獲知總服務器的網(wǎng)絡地址變化。

    (4) 實現(xiàn)如下:

    ?提供搜索引擎的服務器都在Zookeeper中創(chuàng)建znodezk.create("/search/nodes/node1",?"hostname".getBytes(),?Ids.OPEN_ACL_UNSAFE,CreateFlags.EPHEMERAL);


    總服務器可以從Zookeeper中獲取一個znode的子節(jié)點的列表,zk.getChildren("/search/nodes", true);

    ?總服務器遍歷這些子節(jié)點,并獲取子節(jié)點的數(shù)據(jù)生成提供搜索引擎的服務器列表

    ?當總服務器接收到子節(jié)點改變的事件信息,重新返回第二步;

    ?總服務器Zookeeper中創(chuàng)建節(jié)點,zk.create("/search/master", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);


    備用的總服務器監(jiān)控Zookeeper中的"/search/master"節(jié)點。當這個znode的節(jié)點數(shù)據(jù)改變時,把自己啟動變成總服務器,并把自己的網(wǎng)絡地址數(shù)據(jù)放進這個節(jié)點。

    ?webcgiZookeeper"/search/master"節(jié)點獲取總服務器的網(wǎng)絡地址數(shù)據(jù),并向其發(fā)送搜索請求。

    ?webcgi監(jiān)控Zookeeper中的"/search/master"節(jié)點,當這個znode的節(jié)點數(shù)據(jù)改變時,從這個節(jié)點獲取總服務器的網(wǎng)絡地址數(shù)據(jù),并改變當前的總服務器的網(wǎng)絡地址。
    《新程序員》:云原生和全面數(shù)字化實踐50位技術專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

    總結(jié)

    以上是生活随笔為你收集整理的分布式服务框架 Zookeeper — 管理分布式环境中的数据的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。