服务注册与发现框架discovery源码解析
discovery是B站開源的類Eurekad的一款服務注冊與發現框架,簡單介紹如下:
1. 實現AP類型服務注冊發現系統,在可用性極極極極強的情況下,努力保證數據最終一致性 2. 與公司k8s平臺深度結合,注冊打通、發布平滑、naming service等等 3. 網絡閃斷等異常情況,可自我保護,保證每個節點可用 4. 基于HTTP協議實現接口,簡單易用,維護各流行語言SDK## 相對Netflix Eureka的改進* 長輪詢監聽應用變更(Eureka定期30s拉取一次) * 只拉取感興趣的AppID實例(Eureka一拉就是全部,無法區分) * 合并node之間的同步請求/(ㄒoㄒ)/~~其實還沒實現,是個TODO * Dashboard騷操作~ * 多注冊中心信息同步支持 * 更完善的日志記錄下面簡單就discovery的源碼進項分析,首先先熟悉一下框架的一些基本概念:
0. 通過AppID(服務名)和hostname定位實例 1. Node: discovery server節點 2. Provider: 服務提供者,目前托管給k8s平臺,容器啟動后發起register請求給Discover server,后定期(30s)心跳一次 3. Consumer: 啟動時拉取node節點信息,后隨機選擇一個node發起long polling(30s一次)拉取服務instances列表 4. Instance: 保存在node內存中的AppID對應的容器節點信息,包含hostname/ip/service等比較重要的一些特色是:
1. 心跳復制(Peer to Peer),數據一致性的保障:* AppID注冊時根據當前時間生成dirtyTimestamp,nodeA向nodeB同步(register)時,nodeB可能有以下兩種情況:* 返回-404 則nodeA攜帶dirtyTimestamp向nodeB發起注冊請求,把最新信息同步:1. nodeB中不存在實例2. nodeB中dirtyTimestamp較小* 返回-409 nodeB不同意采納nodeA信息,且返回自身信息,nodeA使用該信息更新自身* AppID注冊成功后,Provider每(30s)發起一次heartbeat請求,處理流程如上 2. Instance管理* 正常檢測模式,隨機分批踢掉無心跳Instance節點,盡量避免單應用節點被一次全踢* 網絡閃斷和分區時自我保護模式* 60s內丟失大量(小于Instance總數*2*0.85)心跳數,“好”“壞”Instance信息都保留* 所有node都會持續提供服務,單個node的注冊和發現功能不受影響* 最大保護時間,防止分區恢復后大量原先Instance真的已經不存在時,一直處于保護模式 3. Consumer客戶端* 長輪詢+node推送,服務發現準實時* 訂閱式,只需要關注想要關注的AppID的Instance列表變化* 緩存實例Instance列表信息,保證與node網絡不通等無法訪問到node情況時原先的Instance可用下面開始進行源碼分析:
discovery的入口文件非常簡單:
func main() {//解析配置文件flag.Parse()if err := conf.Init(); err != nil {log.Error("conf.Init() error(%v)", err)panic(err)}fmt.Println("conf", conf.Conf)log.Init(conf.Conf.Log)//開始一個新的discovery中心dis, cancel := discovery.New(conf.Conf)//本地實例的http監聽端口,提供了一系列的http接口,比如注冊,更新和下線接口等http.Init(conf.Conf, dis)// init signalc := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)for {s := <-clog.Info("discovery get a signal %s", s.String())switch s {//如果監聽到停止信號,則進行收尾工作處理,在cancel函數中具體說明case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:cancel()time.Sleep(time.Second)log.Info("discovery quit !!!")returncase syscall.SIGHUP:default:return}} }其中最核心的函數是dis, cancel := discovery.New(conf.Conf),下面詳細分析這個函數:
// New get a discovery. func New(c *conf.Config) (d *Discovery, cancel context.CancelFunc) {//構建一個新的Discoveryd = &Discovery{protected: c.EnableProtect,c: c,client: http.NewClient(c.HTTPClient),//初始化服務注冊中心,并開始協程進行保護模式下的邏輯處理registry: registry.NewRegistry(c),}//讀取配置文件中的zone和node相關的配置讀取出來,返回Nodes結構體// Nodes is helper to manage lifecycle of a collection of Nodes.//type Nodes struct {// nodes []*Node //Node切片數組// zones map[string][]*Node// selfAddr string //本地http監聽的地址//}d.nodes.Store(registry.NewNodes(c))//將除了本地http監聽端口的host之外其他的host,拉取出其他host的所有實例,在本地進行注冊d.syncUp()//注冊自己本身,并每隔30s進行一次心跳檢測cancel = d.regSelf()//開始協程進行長輪訓,實現服務注冊進來后準實時發現go d.nodesproc()//如果成功進行了兩輪renew循環,則關閉保護模式//保護模式下不再接受其他Discovery中心的拉取和推送請求,之前提供的http接口也失效,但是本地Discovery還是可以正常提供服務注冊go d.exitProtect()return }這個函數中包括了一系列的邏輯,先來看d.syncUp()函數
// syncUp populates the registry information from a peer eureka node. func (d *Discovery) syncUp() {nodes := d.nodes.Load().(*registry.Nodes)//將配置文件中的所有node拿出來,循環處理for _, node := range nodes.AllNodes() {log.Info("syncUp nodes are %v", node)//如果是自己本身,則跳過if nodes.Myself(node.Addr) {continue}uri := fmt.Sprintf(_fetchAllURL, node.Addr)var res struct {Code int `json:"code"`Data map[string][]*model.Instance `json:"data"`}//獲取其他節點的所有instance實例,如果其他節點沒起來或者fetch的時候發生異常,則跳過這個節點的處理if err := d.client.Get(context.TODO(), uri, "", nil, &res); err != nil {log.Error("d.client.Get(%v) error(%v)", uri, err)continue}log.Info("fetch res are %v", res, res.Code)if res.Code != 0 {log.Error("service syncup from(%s) failed ", uri)continue}// sync success from other node,exit protected mode//如果能正常從其他節點拉取到他們的實例信息,則表示節點之間的通訊是正常的,如果配置文件中是開啟保護模式的話,這個時候就可以關閉保護模式了d.protected = falsefor _, is := range res.Data {for _, i := range is {//將每個節點中的實例注冊到本地_ = d.registry.Register(i, i.LatestTimestamp)}}// NOTE: no return, make sure that all instances from other nodes register into self.}//將本地node狀態置為正常nodes.UP() }上面的流程主要是將其他節點的實例拉過來,然后注冊到本地,所以下面我們來看本地注冊干了些啥
/ Register a new instance. func (r *Registry) Register(ins *model.Instance, latestTime int64) (err error) {//初始化一個APP實例,APP的結構在這里放一下,方便后面的理解/*// App Instances distinguished by hostnametype App struct {AppID stringZone stringinstances map[string]*InstancelatestTimestamp int64 lock sync.RWMutex}*/a := r.newApp(ins)//將傳入的ins傳入后,copy后返回一個全新的instance,類似于快照,記錄這個時刻的insi, ok := a.NewInstance(ins, latestTime)if ok {//如果注冊成功,則將expPerMin+2(+2是因為每分鐘discovery會renew兩次,也計算出相應的expThreshold,方便后面的保護模式的計算)r.gd.incrExp()}// NOTE: make sure free poll before update appid latest timestamp.//既然有新的實例注冊進來了,當然要廣播出去,讓本地去更新實例緩存了r.broadcast(i.Env, i.AppID)return }先來看r.newApp(ins)這個函數
func (r *Registry) newApp(ins *model.Instance) (a *model.App) {//先實例化一個appsas, _ := r.newapps(ins.AppID, ins.Env)//然后開始搞一個新的App實例a, _ = as.NewApp(ins.Zone, ins.AppID, ins.LatestTimestamp)return }func (r *Registry) newapps(appid, env string) (a *model.Apps, ok bool) {//key是appid-env的stringkey := appsKey(appid, env)r.aLock.Lock()//先看下這個key是否有了Apps了/*// Apps app distinguished by zonetype Apps struct {apps map[string]*Applock sync.RWMutexlatestTimestamp int64}*/if a, ok = r.appm[key]; !ok {a = model.NewApps()r.appm[key] = a}r.aLock.Unlock()return }// NewApp news a app by appid. If ok=false, returns the app of already exist. func (p *Apps) NewApp(zone, appid string, lts int64) (a *App, new bool) {p.lock.Lock()a, ok := p.apps[zone]if !ok {a = NewApp(zone, appid)p.apps[zone] = a}if lts <= p.latestTimestamp {// insure increaselts = p.latestTimestamp + 1}//注意這里,在注冊的邏輯里,如果有新的實例注冊進來的話,latestTimestamp這個字段是有更新的//正常情況下,會用注冊的實例的注冊時間進行更新p.latestTimestamp = ltsp.lock.Unlock()new = !okreturn }通過上面返回的App實例,初始化Instance信息
// NewInstance new a instance. func (a *App) NewInstance(ni *Instance, latestTime int64) (i *Instance, ok bool) {i = new(Instance)a.lock.Lock()oi, ok := a.instances[ni.Hostname]if ok {ni.UpTimestamp = oi.UpTimestampif ni.DirtyTimestamp < oi.DirtyTimestamp {log.Warn("register exist(%v) dirty? timestamp over than caller(%v)", oi, ni)ni = oi}}a.instances[ni.Hostname] = nia.updateLatest(latestTime)*i = *nia.lock.Unlock()ok = !okreturn }實例已經注冊好了,下面就廣播出去吧
// broadcast on poll by chan. // NOTE: make sure free poll before update appid latest timestamp. func (r *Registry) broadcast(env, appid string) {key := pollKey(env, appid)r.cLock.Lock()conns, ok := r.conns[key]//如果是剛啟動的時候將其他節點的實例注冊進來的話,這里是空的,在這里直接返回fmt.Println("conns", conns)if !ok {fmt.Println("no co")r.cLock.Unlock()return} 第一次廣播的時候,直接返回 ....... }至此,其他節點的處理流程就處理完了,下面我們看下將自己本身注冊進來
func (d *Discovery) regSelf() context.CancelFunc {ctx, cancel := context.WithCancel(context.Background())now := time.Now().UnixNano()ins := &model.Instance{Region: d.c.Env.Region,Zone: d.c.Env.Zone,Env: d.c.Env.DeployEnv,Hostname: d.c.Env.Host,AppID: model.AppID,Addrs: []string{"http://" + d.c.HTTPServer.Addr,},Status: model.InstanceStatusUP,RegTimestamp: now,UpTimestamp: now,LatestTimestamp: now,RenewTimestamp: now,DirtyTimestamp: now,}//將自己注冊進來,并將自身的實例信息同步到其他的節點進行注冊d.Register(ctx, ins, now, false, false)go func() {ticker := time.NewTicker(30 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:arg := &model.ArgRenew{AppID: ins.AppID,Zone: d.c.Env.Zone,Env: d.c.Env.DeployEnv,Hostname: d.c.Env.Host,}//每隔30秒renew一下if _, err := d.Renew(ctx, arg); err != nil && err == ecode.NothingFound {log.Info("renew self err is %v", err)d.Register(ctx, ins, now, false, false)}case <-ctx.Done():arg := &model.ArgCancel{AppID: model.AppID,Zone: d.c.Env.Zone,Env: d.c.Env.DeployEnv,Hostname: d.c.Env.Host,}if err := d.Cancel(context.Background(), arg); err != nil {log.Error("d.Cancel(%+v) error(%v)", arg, err)}return}}}()return cancel }?這里的主要邏輯是將自身注冊進來,并每隔30秒renew一次,將自己的信息同步到其他節點,下面分析一下renew的邏輯
// Renew marks the given instance of the given app name as renewed, and also marks whether it originated from replication. func (d *Discovery) Renew(c context.Context, arg *model.ArgRenew) (i *model.Instance, err error) {log.Info("renew args are %v", arg)//獲取自身實例信息i, ok := d.registry.Renew(arg)if !ok {err = ecode.NothingFoundlog.Error("renew appid(%s) hostname(%s) zone(%s) env(%s) error", arg.AppID, arg.Hostname, arg.Zone, arg.Env)return}//同步信息到其他節點(如果需要的話,這種情況下同步后直接退出了)if !arg.Replication {_ = d.nodes.Load().(*registry.Nodes).Replicate(c, model.Renew, i, arg.Zone != d.c.Env.Zone)return}//如果renew的DirtyTimestamp大于實例的DirtyTimestamp,返回-404,滿足如下條件中的第二種條件// * 返回-404 則nodeA攜帶dirtyTimestamp向nodeB發起注冊請求,把最新信息同步:// 1. nodeB中不存在實例// 2. nodeB中dirtyTimestamp較小if arg.DirtyTimestamp > i.DirtyTimestamp {err = ecode.NothingFound} else if arg.DirtyTimestamp < i.DirtyTimestamp {err = ecode.Conflict}return }//同步邏輯 // Replicate replicate information to all nodes except for this node. func (ns *Nodes) Replicate(c context.Context, action model.Action, i *model.Instance, otherZone bool) (err error) {log.Warn("nodes is %v,len is %v", ns.nodes, len(ns.nodes))if len(ns.nodes) == 0 {return}eg, c := errgroup.WithContext(c)for _, n := range ns.nodes {log.Warn("Replicate node is %v", n)//將自身實例同步到其他節點if !ns.Myself(n.addr) {ns.action(c, eg, action, n, i)}}if !otherZone {for _, zns := range ns.zones {if n := len(zns); n > 0 {ns.action(c, eg, action, zns[rand.Intn(n)], i)}}}err = eg.Wait()return }//action函數具體邏輯 func (ns *Nodes) action(c context.Context, eg *errgroup.Group, action model.Action, n *Node, i *model.Instance) {log.Info("action arg is %v", i)switch action {case model.Register:eg.Go(func() error {_ = n.Register(c, i)return nil})case model.Renew://開啟協程去renew到其他節點eg.Go(func() error {_ = n.Renew(c, i)return nil})case model.Cancel:eg.Go(func() error {_ = n.Cancel(c, i)return nil})} }//n.Renew函數的邏輯 // Renew send the heartbeat information of Instance receiving by this node to the peer node represented. // If the instance does not exist the node, the instance registration information is sent again to the peer node. func (n *Node) Renew(c context.Context, i *model.Instance) (err error) {var res *model.Instanceerr = n.call(c, model.Renew, i, n.renewURL, &res)log.Info("renew other node info are %v,url is %v,res is %v", i, n.renewURL, err)if err == ecode.ServerErr {log.Warn("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err)n.status = model.NodeStatusLostreturn}n.status = model.NodeStatusUPif err == ecode.NothingFound {log.Warn("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err)err = n.call(c, model.Register, i, n.registerURL, nil)return}// NOTE: register response instance whitch in conflict with peer nodeif err == ecode.Conflict && res != nil {err = n.call(c, model.Register, res, n.pRegisterURL, nil)}return }//重點在于n.call(c, model.Renew, i, n.renewURL, &res) func (n *Node) call(c context.Context, action model.Action, i *model.Instance, uri string, data interface{}) (err error) {params := url.Values{}params.Set("region", i.Region)params.Set("zone", i.Zone)params.Set("env", i.Env)params.Set("appid", i.AppID)params.Set("hostname", i.Hostname)params.Set("from_zone", "true")//同步到其他節點的時候,一般是走入為true的邏輯分支if n.otherZone {params.Set("replication", "false")} else {params.Set("replication", "true")}switch action {case model.Register:params.Set("addrs", strings.Join(i.Addrs, ","))params.Set("status", strconv.FormatUint(uint64(i.Status), 10))params.Set("version", i.Version)meta, _ := json.Marshal(i.Metadata)params.Set("metadata", string(meta))params.Set("reg_timestamp", strconv.FormatInt(i.RegTimestamp, 10))params.Set("dirty_timestamp", strconv.FormatInt(i.DirtyTimestamp, 10))params.Set("latest_timestamp", strconv.FormatInt(i.LatestTimestamp, 10))case model.Renew:params.Set("dirty_timestamp", strconv.FormatInt(i.DirtyTimestamp, 10))case model.Cancel:params.Set("latest_timestamp", strconv.FormatInt(i.LatestTimestamp, 10))}var res struct {Code int `json:"code"`Data json.RawMessage `json:"data"`}//請求其他節點的renewUrl(http接口)if err = n.client.Post(c, uri, "", params, &res); err != nil {log.Error("node be called(%s) instance(%v) error(%v)", uri, i, err)return}if res.Code != 0 {log.Error("node be called(%s) instance(%v) response code(%v)", uri, i, res.Code)if err = ecode.Int(res.Code); err == ecode.Conflict {_ = json.Unmarshal([]byte(res.Data), data)}}return }renew的時候的邏輯是根據renew的參數找到自身實例,再將自身信息同步到其他節點
重點講解一下renew的邏輯:
在將自身信息注冊到其他的discovery之后,會有兩種情況發生
1.其他節點向本地發起renew請求
先是從本地緩存中查找到renew請求arg中的zone,env和appid找到之前注冊在本地的app信息,再通過arg中的host找到app中的instances(map)中的instance信息(在這個過程中會更新實例的renewTimestamp),正常情況下就沒有其他的邏輯了,因為其他的分支都進不去
2.本地節點更新
每隔30秒本地會拿著conf文件中的zone,env,host和在本地注冊時候的AppID作為參數進行renew,
也是先找出本地緩存中的實例信息,然后同步給其他node(http POST請求其他discovery節點的接口地址),在同步其他節點的時候,會從zones(初始化注冊時候的其他zone的節點信息)切片中隨機選取一個node進行同步
上述注冊其他節點和自身節點完成后,就是一個長輪訓,實現服務發現準實時
func (d *Discovery) nodesproc() {var (lastTs int64)for {arg := &model.ArgPolls{AppID: []string{model.AppID},Env: d.c.Env.DeployEnv,Hostname: d.c.Env.Host,LatestTimestamp: []int64{lastTs},}log.Info("polls times is %v", time.Now().Format("2006-01-02 15:04:05"))//返回一個消費者chanch, _, _, err := d.registry.Polls(arg)if err != nil && err != ecode.NotModified {log.Error("d.registry(%v) error(%v)", arg, err)time.Sleep(time.Second)continue}log.Info("wait for ch out")apps := <-chins, ok := apps[model.AppID]fmt.Println("go process ins", ins)if !ok || ins == nil {return}var (nodes []stringzones = make(map[string][]string))for _, ins := range ins.Instances {for _, in := range ins {log.Info("range ins are %v", in)for _, addr := range in.Addrs {u, err := url.Parse(addr)if err == nil && u.Scheme == "http" {if in.Zone == d.c.Env.Zone {nodes = append(nodes, u.Host)} else {zones[in.Zone] = append(zones[in.Zone], u.Host)}}}}}lastTs = ins.LatestTimestampc := new(conf.Config)*c = *d.cc.Nodes = nodesc.Zones = zonesns := registry.NewNodes(c)ns.UP()d.nodes.Store(ns)log.Info("discovery changed nodes:%v zones:%v", nodes, zones)} }這個準實時的實現我的理解是:
1.在d.registry.Polls(arg)方法調用后會返回一個消費者channel,然后會在for循環中會阻塞在這個channel里,一旦有新的節點注冊進來的時候(我們可以看一下,新實例注冊的時候,會進行廣播通知,這里有個
會往這個channel中寫入新注冊的instance信息。)就會立馬收到注冊信息,然后更新本地實例緩存,做到服務發現近乎實時。
還有discovery的自保護模式,在下一篇中進行分析了。?
總結
以上是生活随笔為你收集整理的服务注册与发现框架discovery源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux内核启动文档翻译(i386)
- 下一篇: rtsp的移植