日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > windows >内容正文

windows

Kubernetes: client-go 源码剖析(一)

發(fā)布時(shí)間:2023/12/24 windows 37 coder
生活随笔 收集整理的這篇文章主要介紹了 Kubernetes: client-go 源码剖析(一) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

0. 前言

在看 kube-scheduler 組件的過(guò)程中遇到了 kube-scheduler 對(duì)于 client-go 的調(diào)用,泛泛的理解調(diào)用過(guò)程總有種隔靴搔癢的感覺(jué),于是調(diào)轉(zhuǎn)頭先把 client-go 理清楚在回來(lái)看 kube-scheduler

為什么要看 client-go,并且要深入到原理,源碼層面去看。很簡(jiǎn)單,因?yàn)樗苤匾V匾趦煞矫妫?/p>

  1. kubernetes 組件通過(guò) client-gokube-apiserver 交互。
  2. client-go 簡(jiǎn)單,易用,大部分基于 Kubernetes 做二次開(kāi)發(fā)的應(yīng)用,在和 kube-apiserver 交互時(shí)會(huì)使用 client-go

當(dāng)然,不僅在于使用,理解層面,對(duì)于我們學(xué)習(xí)代碼開(kāi)發(fā),架構(gòu)等也有幫助。

1. client-go 客戶(hù)端對(duì)象

client-go 支持四種客戶(hù)端對(duì)象,分別是 RESTClientClientSetDynamicClientDiscoveryClient

組件或者二次開(kāi)發(fā)的應(yīng)用可以通過(guò)這四種客戶(hù)端對(duì)象和 kube-apiserver 交互。其中,RESTClient 是最基礎(chǔ)的客戶(hù)端對(duì)象,它封裝了 HTTP Request,實(shí)現(xiàn)了 RESTful 風(fēng)格的 APIClientSet 基于 RESTClient,封裝了對(duì)于 ResourceVersion 的請(qǐng)求方法。DynamicClient 相比于 ClientSet 提供了全資源,包括自定義資源的請(qǐng)求方法。DiscoveryClient 用于發(fā)現(xiàn) kube-apiserver 支持的資源組,資源版本和資源信息。

每種客戶(hù)端適用的場(chǎng)景不同,主要是對(duì) HTTP Request 做了層層封裝,具體的代碼實(shí)現(xiàn)可參考 client-go 客戶(hù)端對(duì)象。

2. informer 機(jī)制

僅僅封裝 HTTP Request 是不夠的,組件通過(guò) client-gokube-apiserver 交互,必然對(duì)實(shí)時(shí)性,可靠性等有很高要求。試想,如果 ETCD 中存儲(chǔ)的數(shù)據(jù)和組件通過(guò) client-goETCD 獲取的數(shù)據(jù)不匹配的話(huà),那將會(huì)是一個(gè)非常嚴(yán)重的問(wèn)題。

如何實(shí)現(xiàn) client-go 的實(shí)時(shí)性,可靠性?client-go 給出的答案是:informer 機(jī)制。

? ? ? ? ? ? ? ? client-go informer 流程圖

informer 機(jī)制的核心組件包括:

  • Reflector: 主要負(fù)責(zé)兩類(lèi)任務(wù):
    1. 通過(guò) client-go 客戶(hù)端對(duì)象 list kube-apiserver 資源,并且 watch kube-apiserver 資源變更。
    2. 作為生產(chǎn)者,將獲取的資源放入 Delta FIFO 隊(duì)列。
  • Informer: 主要負(fù)責(zé)三類(lèi)任務(wù):
    1. 作為消費(fèi)者,將 Reflector 放入隊(duì)列的資源拿出來(lái)。
    2. 將資源交給 indexer 組件。
    3. 交給 indexer 組件之后觸發(fā)回調(diào)函數(shù),處理回調(diào)事件。
  • Indexer: indexer 組件負(fù)責(zé)將資源信息存入到本地內(nèi)存數(shù)據(jù)庫(kù)(實(shí)際是 map 對(duì)象),該數(shù)據(jù)庫(kù)作為緩存存在,其資源信息和 ETCD 中的資源信息完全一致(得益于 watch 機(jī)制)。因此,client-go 可以從本地 indexer 中讀取相應(yīng)的資源,而不用每次都從 kube-apiserver 中獲取資源信息。這也實(shí)現(xiàn)了 client-go 對(duì)于實(shí)時(shí)性的要求。

接下來(lái)從源碼角度看各個(gè)組件的處理流程,力圖做到知其然,知其所以然。

2 informer 源碼分析

直接閱讀 informer 源碼是非常晦澀難懂的,這里通過(guò) informer 的代碼示例開(kāi)始學(xué)習(xí):

package main

import (
	"log"
	"time"

	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
    // 解析 kubeconfig
	config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
	if err != nil {
		panic(err)
	}

    // 創(chuàng)建 ClientSet 客戶(hù)端對(duì)象
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	stopCh := make(chan struct{})
	defer close(stopCh)

    // 創(chuàng)建 sharedInformers
	sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
    // 創(chuàng)建 informer
	informer := sharedInformers.Core().V1().Pods().Informer()

    // 創(chuàng)建 Event 回調(diào) handler
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("New Pod Added to Store: %s", mObj.GetName())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oObj := oldObj.(v1.Object)
			nObj := newObj.(v1.Object)
			log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("Pod Deleted from Store: %s", mObj.GetName())
		},
	})

    // 運(yùn)行 informer
	informer.Run(stopCh)
}

執(zhí)行結(jié)果如下:

# go run informer.go 
2023/12/14 12:00:26 New Pod Added to Store: prometheus-alertmanager-0
2023/12/14 12:01:26 prometheus-alertmanager-0 Pod Updated to prometheus-alertmanager-0

上述代碼示例分為三部分:創(chuàng)建 informer,創(chuàng)建 informerEventHandler,運(yùn)行 informer。下面,通過(guò)這三部分流程介紹 client-go 的核心組件。

2.1 創(chuàng)建 informer

創(chuàng)建 informer 分為兩步。

1)創(chuàng)建工廠(chǎng) sharedInformerFactory

// sharedInformers factory 
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)

// client-go/informers/factory.go
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

sharedInformerFactory 實(shí)現(xiàn)了 SharedInformerFactory 接口,該工廠(chǎng)負(fù)責(zé)創(chuàng)建 informer

2)創(chuàng)建 informer

// 創(chuàng)建 informer
informer := sharedInformers.Core().V1().Pods().Informer()

// 調(diào)用 Core 方法
func (f *sharedInformerFactory) Core() core.Interface {
	return core.New(f, f.namespace, f.tweakListOptions)
}

func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
	return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// 調(diào)用 V1 方法
func (g *group) V1() v1.Interface {
	return v1.New(g.factory, g.namespace, g.tweakListOptions)
}

func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
	return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// 調(diào)用 Pods 方法
func (v *version) Pods() PodInformer {
	return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

經(jīng)過(guò)層層構(gòu)建創(chuàng)建 podInformer 對(duì)象,該對(duì)象實(shí)現(xiàn)了 PodInformer 接口,調(diào)用接口的 Informer 方法創(chuàng)建 informer 對(duì)象:

func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

podInformer.Informer 實(shí)際調(diào)用的是 sharedInformerFactory.InformerFor

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

    // 反射出資源對(duì)象 obj 的 type 
	informerType := reflect.TypeOf(obj)

    // 讀取并判斷資源對(duì)象的 informer
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	...

    // 調(diào)用 newFunc 創(chuàng)建 informer
	informer = newFunc(f.client, resyncPeriod)

    // 將 type:informer 加入到 factory 的 informers 中
	f.informers[informerType] = informer

	return informer
}

InformerFor 方法可以看出,sharedInformerFactory 的 share 體現(xiàn)在同一個(gè)資源類(lèi)型共享 informer

這么設(shè)計(jì)在于,每個(gè) informer 包括一個(gè) ReflectorReflector 通過(guò)訪(fǎng)問(wèn) kube-apiserver 實(shí)現(xiàn) ListAndWatch 操作。共享 informer 實(shí)際是共享 Reflector,這種共享機(jī)制將減少 Reflector 對(duì)于 kube-apiserver 的訪(fǎng)問(wèn),降低 kube-apiserver 的負(fù)載,節(jié)約資源。

繼續(xù)看,創(chuàng)建 informernewFunc 函數(shù)做了什么:

informer = newFunc(f.client, resyncPeriod)

// client-go/informers/core/v1/pod.go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

newFunc 實(shí)際調(diào)用的是 NewFilteredPodInformer 函數(shù),在函數(shù)內(nèi)創(chuàng)建 cache.ListAndWatch 對(duì)象,對(duì)象中包括 ListFuncWatchFunc 回調(diào)函數(shù),回調(diào)函數(shù)內(nèi)調(diào)用 ClientSet 實(shí)現(xiàn) list 和 watch 資源對(duì)象。

繼續(xù)看 cache.NewSharedIndexInformer

// client-go/tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	return NewSharedIndexInformerWithOptions(
		lw,
		exampleObject,
		SharedIndexInformerOptions{
			ResyncPeriod: defaultEventHandlerResyncPeriod,
			Indexers:     indexers,
		},
	)
}

func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
	realClock := &clock.RealClock{}

	return &sharedIndexInformer{
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
		processor:                       &sharedProcessor{clock: realClock},
		listerWatcher:                   lw,
		objectType:                      exampleObject,
		objectDescription:               options.ObjectDescription,
		resyncCheckPeriod:               options.ResyncPeriod,
		defaultEventHandlerResyncPeriod: options.ResyncPeriod,
		clock:                           realClock,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
	}
}

NewSharedIndexInformerWithOptions 函數(shù)內(nèi)創(chuàng)建 informer sharedIndexInformer。可以看到,sharedIndexInformer 內(nèi)包括了 indexer 核心組件。

informer 創(chuàng)建完成。接下來(lái)為 informer 添加回調(diào)函數(shù) EventHandler

2.2 創(chuàng)建 EventHandler

代碼實(shí)現(xiàn)如下:

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        mObj := obj.(v1.Object)
        log.Printf("New Pod Added to Store: %s", mObj.GetName())
    },
    UpdateFunc: func(oldObj, newObj interface{}) {
        oObj := oldObj.(v1.Object)
        nObj := newObj.(v1.Object)
        log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
    },
    DeleteFunc: func(obj interface{}) {
        mObj := obj.(v1.Object)
        log.Printf("Pod Deleted from Store: %s", mObj.GetName())
    },
})

創(chuàng)建 EventHandlerhandler 中包括三種回調(diào)函數(shù):AddFuncUpdateFuncDeleteFunc,三種回調(diào)函數(shù)分別在資源有增加,變更,刪除時(shí)觸發(fā)。

sharedIndexInformer.AddEventHandler 內(nèi),將 handler 傳遞給 sharedIndexInformer.AddEventHandlerWithResyncPeriod 方法,該方法主要?jiǎng)?chuàng)建 listener 對(duì)象:

// client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
	return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
    ...
	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)

    if !s.started {
		return s.processor.addListener(listener), nil
	}
    ...
}

// client-go/tools/cache/shared_informer.go
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
	ret := &processorListener{
		nextCh:                make(chan interface{}),
		addCh:                 make(chan interface{}),
		handler:               handler,
		syncTracker:           &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
		requestedResyncPeriod: requestedResyncPeriod,
		resyncPeriod:          resyncPeriod,
	}

	ret.determineNextResync(now)

	return ret
}

func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
    ...

	p.listeners[listener] = true
    ...

	return listener
}

listener 對(duì)象包含通道 addChnextCh,以及 handler 等對(duì)象。最后將 listener 存入 sharedIndexInformer.sharedProcessor 中。

創(chuàng)建完 informerEventHandler,接下來(lái)該運(yùn)行 informer 了。


總結(jié)

以上是生活随笔為你收集整理的Kubernetes: client-go 源码剖析(一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。