Kubernetes 编写自定义 controller
生活随笔
收集整理的這篇文章主要介紹了
Kubernetes 编写自定义 controller
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
原文鏈接:Kubernetes編寫自定義controller
來自kubernetes官方github的一張圖:
?
?
如圖所示,圖中的組件分為client-go和custom controller兩部分:
client-go部分
- Reflector: 監(jiān)視特定資源的k8s api, 把新監(jiān)測的對象放入Delta Fifo隊列,完成此操作的函數(shù)是ListAndWatch。
- Informer: 從Delta Fifo隊列拿出對象,完成此操作的函數(shù)是processLoop。
- Indexer: 提供線程級別安全來存儲對象和key。
custom-controller部分
- Informer reference: Informer對象引用
- Indexer reference: Indexer對象引用
- Resource Event Handlers: 被Informer調(diào)用的回調(diào)函數(shù),這些函數(shù)的作用通常是獲取對象的key,并把key放入Work queue,以進(jìn)一步做處理。
- Work queue: 工作隊列,用于將對象的交付與其處理分離,編寫Resource event handler functions以提取傳遞的對象的key并將其添加到工作隊列。
- Process Item: 用于處理Work queue中的對象,可以有一個或多個其他函數(shù)一起處理;這些函數(shù)通常使用Indexer reference或Listing wrapper來檢索與該鍵對應(yīng)的對象。
client-go官方代碼例子
package mainimport ("flag""fmt""time""k8s.io/klog""k8s.io/api/core/v1"meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1""k8s.io/apimachinery/pkg/fields""k8s.io/apimachinery/pkg/util/runtime""k8s.io/apimachinery/pkg/util/wait""k8s.io/client-go/kubernetes""k8s.io/client-go/tools/cache""k8s.io/client-go/tools/clientcmd""k8s.io/client-go/util/workqueue" )// 定義一個結(jié)構(gòu)體Controller type Controller struct {indexer cache.Indexerqueue workqueue.RateLimitingInterfaceinformer cache.Controller }// 獲取controller的函數(shù) func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {return &Controller{informer: informer,indexer: indexer,queue: queue,} }// 處理workqueue中的對象 func (c *Controller) processNextItem() bool {// Wait until there is a new item in the working queuekey, quit := c.queue.Get()if quit {return false}// Tell the queue that we are done with processing this key. This unblocks the key for other workers// This allows safe parallel processing because two pods with the same key are never processed in// parallel. defer c.queue.Done(key)// Invoke the method containing the business logicerr := c.syncToStdout(key.(string))// Handle the error if something went wrong during the execution of the business logic c.handleErr(err, key)return true }// syncToStdout is the business logic of the controller. In this controller it simply prints // information about the pod to stdout. In case an error happened, it has to simply return the error. // The retry logic should not be part of the business logic. func (c *Controller) syncToStdout(key string) error {obj, exists, err := c.indexer.GetByKey(key)if err != nil { klog.Errorf("Fetching object with key %s from store failed with %v", key, err)return err}if !exists { // Below we will warm up our cache with a Pod, so that we will see a delete for one podfmt.Printf("Pod %s does not exist anymore\n", key)} else {// Note that you also have to check the uid if you have a local controlled resource, which// is dependent on the actual instance, to detect that a Pod was recreated with the same namefmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())}return nil }// handleErr checks if an error happened and makes sure we will retry later. func (c *Controller) handleErr(err error, key interface{}) {if err == nil {// Forget about the #AddRateLimited history of the key on every successful synchronization.// This ensures that future processing of updates for this key is not delayed because of// an outdated error history. c.queue.Forget(key)return}// This controller retries 5 times if something goes wrong. After that, it stops trying.if c.queue.NumRequeues(key) < 5 {klog.Infof("Error syncing pod %v: %v", key, err)// Re-enqueue the key rate limited. Based on the rate limiter on the// queue and the re-enqueue history, the key will be processed later again. c.queue.AddRateLimited(key)return}c.queue.Forget(key)// Report to an external entity that, even after several retries, we could not successfully process this key runtime.HandleError(err)klog.Infof("Dropping pod %q out of the queue: %v", key, err) }func (c *Controller) Run(threadiness int, stopCh chan struct{}) {defer runtime.HandleCrash()// Let the workers stop when we are done defer c.queue.ShutDown()klog.Info("Starting Pod controller")go c.informer.Run(stopCh)// Wait for all involved caches to be synced, before processing items from the queue is startedif !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))return}for i := 0; i < threadiness; i++ {go wait.Until(c.runWorker, time.Second, stopCh)}<-stopChklog.Info("Stopping Pod controller") }func (c *Controller) runWorker() {for c.processNextItem() {} }func main() {var kubeconfig stringvar master string// 指定kubeconfig文件flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")flag.StringVar(&master, "master", "", "master url")flag.Parse()// creates the connectionconfig, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)if err != nil { klog.Fatal(err)}// creates the clientsetclientset, err := kubernetes.NewForConfig(config)if err != nil { klog.Fatal(err)}// create the pod watcherpodListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())// create the workqueuequeue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// Bind the workqueue to a cache with the help of an informer. This way we make sure that// whenever the cache is updated, the pod key is added to the workqueue.// Note that when we finally process the item from the workqueue, we might see a newer version// of the Pod than the version which was responsible for triggering the update.indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {key, err := cache.MetaNamespaceKeyFunc(obj)if err == nil {queue.Add(key)}},UpdateFunc: func(old interface{}, new interface{}) {key, err := cache.MetaNamespaceKeyFunc(new)if err == nil {queue.Add(key)}},DeleteFunc: func(obj interface{}) {// IndexerInformer uses a delta queue, therefore for deletes we have to use this// key function.key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err == nil {queue.Add(key)}},}, cache.Indexers{})controller := NewController(queue, indexer, informer)// We can now warm up the cache for initial synchronization.// Let's suppose that we knew about a pod "mypod" on our last run, therefore add it to the cache. // If this pod is not there anymore, the controller will be notified about the removal after the // cache has synchronized. indexer.Add(&v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ Name: "mypod", Namespace: v1.NamespaceDefault, }, }) // Now let's start the controllerstop := make(chan struct{})defer close(stop)go controller.Run(1, stop)// Wait foreverselect {} }?
轉(zhuǎn)載于:https://www.cnblogs.com/wangjq19920210/p/11527311.html
與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的Kubernetes 编写自定义 controller的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: go chan 缓存与阻塞
- 下一篇: go genetlink demo