日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Istio Pilot 源码分析(一)

發布時間:2023/12/4 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Istio Pilot 源码分析(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

張海東,??多點生活(成都)云原生開發工程師。

Istio?作為目前 Servic Mesh 方案中的翹楚,吸引著越來越多的企業及開發者。越來越多的團隊想將其應用于微服務的治理,但在實際落地時卻因為不了解?Istio?黑盒中的運行機制而左右為難,本文將基于 1.7 的源碼講解?Istio?的核心組件?Pilot?的結構及運行流程,希望對讀者應用?Istio?有所助益。

注:本文基于?istio release-1.7?分支分析,其他版本的代碼結構會有所不同。

背景

隨著?Istio?1.7 的發布,內部組件精簡后的?istiod?日趨穩定,越來越多的公司將其應用到自身微服務的流量治理、安全通信及監測中。多點也不例外,應用?Istio?來落地業務系統所有?Dubbo?服務的網格化,下沉?SDK?邏輯,解決基礎中間件與業務系統過于耦合等痛點。目前,我們是通過自己開發的?Controller?組件對接?Zookeeper?等注冊中心,將注冊到?Zookeeper?的節點實時轉化為?ServiceEntry?及?WorkloadEntry?等?Istio?配置類型寫入?kube-apiserver,再由?Pilot?轉化為?xDS?協議下發至數據面,同時對集群、虛擬機中的服務進行治理。隨著公司服務網格化的逐步落地,對?Istio?及數據面組件源碼級掌握的訴求越來越高,沒有足夠的深度及廣度很難解決開發過程中遇到的難題,讓我們一起揭開?Istio?神秘的面紗,看看黑箱內部是如何運作的。

本文作為?Istio?控制面組件?Pilot?的源碼分析系列,主要面向剛接觸?Istio?或僅停留在使用?Istio?基本配置類型(如?VirtualService、DestinationRule?等)的同學,需要熟悉?Istio?的一些?基礎概念及名詞[1]?。文章會涉及較多的代碼細節,我們會以不同的篇幅分別介紹以下內容:

1.pilot-discovery?宏觀架構及啟動流程梳理2.pilot-discovery?接口設計及關鍵接口分析3.pilot-discovery xDS?生成及下發流程梳理4.pilot-agent?流程梳理5.pilot?中的身份認證及安全通信解析

相信通過源碼一步一步分析,能消除讀者對?Pilot?的陌生感,在基于?Pilot?做適配開發時會更加清楚的了解其底層運行邏輯,碰到問題時也能更好的定位。

Pilot?的代碼主要分為兩部分:

?pilot-discovery?pilot-agent

其中?pilot-agent?負責數據面?Sidecar?實例的生命周期管理,而?pilot-discovery?負責控制面流量管理配置及路由規則的生成和下發。

宏觀架構

pilot-discovery?的核心組件如圖:

pilot-discovery-struct

其中?Server?為?pilot-discovery?的主服務,包含了三個比較重要的組件:

?Config Controller:從不同來源接收流量控制和路由規則等?Istio?的配置,并響應各類事件。?Service Controller:從不同注冊中心同步服務及實例,并響應各類事件。?EnvoyXdsServer:核心的?xDS?協議推送服務,根據上面組件的數據生成?xDS?協議并下發。

Config Controller?比較核心的就是對接?Kubernetes,從?kube-apiserver?中?Watch?集群中的?VirtualService、ServiceEntry、DestinationRules?等配置信息,有變化則生成?PushRequest?推送至?EnvoyXdsServer?中的推送隊列。除此之外,還支持對接?MCP(Mesh Configuration Protocol)?協議的?gRPC Server,如?Nacos?的?MCP?服務等,只需要在?meshconfig?中配置?configSources?即可。最后一種是基于內存的?Config Controller?實現,通過?Watch?一個文件目錄,加載目錄中的?yaml?文件生成配置數據,主要用來測試。

Service Controller?目前原生支持?Kubernetes?和?Consul,注冊在這些注冊中心中的服務可以無痛接入?Mesh,另外一種比較特殊,就是?ServiceEntryStore,它本質是儲存在?Config Controller?中的?Istio?配置數據,但它描述的卻是集群外部的服務信息,詳情可閱讀文檔?ServiceEntry[2],Istio?通過它將集群外部,如部署在虛擬機中的服務、非?Kubernetes?的原生服務同步到?Istio?中,納入網格統一進行流量控制和路由,所以?ServiceEntryStore?也可以視為一種注冊中心。還有一種就是?Mock Service Registry,主要用來測試。

ServiceEntryStore?從?Config Controller?到?Service Controller?的轉化流程大致如圖(后續會做詳細的代碼分析,這里簡單了解一下即可):

pilot-discovery-serviceentrystore

ConfigStores?是一個列表,里面存儲了各類?Istio?配置文件,包括?ServiceEntry?、WorkloadEntry?等服務數據,也包括?VirtualService、DestinationRules、Sidecar?等流量控制、路由規則的配置數據,pilot-discovery?將這些?ConfigStores?聚合成一個?configController?統一進行管理,之后再從其中衍生出?IstioConfigStore,將其作為?serviceEntryStore?的配置源。serviceEntryStore?其實就是?ServiceEntry Controller,響應?ServiceEntry?和?WorkloadEntry?這類服務信息的變化。

EnvoyXdsServer?比較核心,一切與?xDS?協議相關的接收、轉換、下發操作都由它完成。EnvoyXdsServer?對接所有集群中的邊車代理,如?Envoy、MOSN?等,當配置或服務發生變化時主動推送,也會響應代理發送的請求,依據請求的信息下發相應的?xDS?配置。

理解了這三個核心組件的定義,就能比較好的理解下面分析的各類流程了。

pilot-discovery?的整個業務流程梳理如下,可以先大概瀏覽一遍,之后我們逐一進行分析:

pilot-discovery-sequence-all

啟動流程梳理

首先詳細看一下?pilot-discovery?的啟動流程。pilot-discovery?組件的入口代碼在?istio/pilot/cmd/pilot-discovery?中。該目錄中包含兩個文件:?main.go?和?request.go。main.go?中定義了?pilot-discovery?根命令及?discovery?命令,是啟動服務發現及配置下發的主流程; 另一個文件?request.go?中定義了?request?命令,用來請求?Pilot?中的?metrics/debug?接口,多用來調試。

main.go?中?discoveryCmd的?RunE?函數定義了啟動過程,代碼如下:

// 創建一個接收空結構的 stop channel 用來停止所有 servers stop := make(chan struct{}) // 創建服務發現的 Server discoveryServer, err := bootstrap.NewServer(serverArgs) if err != nil {return fmt.Errorf("failed to create discovery service: %v", err) } // 運行 Server 中注冊的所有服務 if err := discoveryServer.Start(stop); err != nil {return fmt.Errorf("failed to start discovery service: %v", err) } // 等待 SIGINT 和 SIGTERM 信號并關閉 stop channel cmd.WaitSignal(stop)

啟動流程如圖所示:

pilot-discovery-init

初始化流程

接下來介紹?discoveryServer?,即?pilot-discovery?組件的核心。在這之前先看下?Server?的結構,代碼位于?istio/pilot/pkg/bootstrap/server.go?文件中。

Server?的關鍵字段如下:

type Server struct {XDSServer *xds.DiscoveryServer // Xds 服務environment *model.Environment // Pilot 環境所需的 API 集合kubeRegistry *kubecontroller.Controller // 處理 Kubernetes 主集群的注冊中心multicluster *kubecontroller.Multicluster // 處理 Kubernetes 多個集群的注冊中心configController model.ConfigStoreCache // 統一處理配置數據(如 VirtualService 等) 的 ControllerConfigStores []model.ConfigStoreCache // 不同配置信息的緩存器,提供 Get、List、Create 等方法serviceEntryStore *serviceentry.ServiceEntryStore // 單獨處理 ServiceEntry 的 ControllerfileWatcher filewatcher.FileWatcher // 文件監聽器,主要 watch meshconfig 和 networks 配置文件等startFuncs []startFunc // 保存了上述所有服務的啟動函數,便于在 Start() 方法中批量啟動及管理 }

再看?NewServer()?方法中的內容,有以下幾個關鍵步驟:

image.png

我們對每個步驟逐一進行分析:

初始化?Environment

什么是?Environment?呢?根據定義?Environment?為?Pilot?提供了一個匯總的、運行中所需的 API 集合。?Environment?中字段(接口)如下:

type Environment struct {ServiceDiscovery // 服務發現的接口模型,主要列出 services 和 instancesIstioConfigStore // Istio 配置文件的存儲器,主要列出 ServiceEntry 等配置mesh.Watcher // mesh config 文件的監聽器mesh.NetworksWatcher // mesh network config 文件的監聽器PushContext *PushContext // 在推送(下發 xDS)生成期間保存信息的上下文DomainSuffix string // istio server 默認的后綴域名 }

其中?PushContext?是?Pilot?在推送?xDS?前,生成配置期間保存相關信息的上下文的地方,在全量推送配置和配置發生改變時重置。它會保存所有的錯誤和統計信息,并緩存一些配置的計算信息。?ServiceDiscovery?提供了枚舉?Istio?中服務和實例的方法。?mesh.Watcher?和?mesh.NetworksWatcher?負責監聽?istiod?啟動時掛載的兩個配置文件,這兩個配置文件是通過?configmap?映射到?Pod?的文件系統中的,監聽器將在監聽到配置文件變化時運行預先注冊的?Handler?。文件掛載參考?istiod?的配置文件:

apiVersion: v1 kind: Pod metadata:name: istiod-56c488887d-z9k5cnamespace: istio-system spec:containers:volumeMounts:- mountPath: /etc/istio/configname: config-volumevolumes:- configMap:defaultMode: 420name: istioname: config-volume

相應的配置存儲在?istio-system/istio?這個?configmap?中,里面保存了?mesh?和?meshNetworks?兩種配置,樣例如下:

apiVersion: v1 kind: ConfigMap metadata:name: istionamespace: istio-system data:mesh: |-accessLogEncoding: TEXTaccessLogFile: ""accessLogFormat: ""defaultConfig:binaryPath: /usr/local/bin/mosnconcurrency: 2configPath: ./etc/istio/proxy...meshNetworks: 'networks: {}'

再回頭看?Environment?的初始化:

e := &model.Environment{PushContext: model.NewPushContext(),DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix, } ac := aggregate.NewController(aggregate.Options{MeshHolder: e, }) e.ServiceDiscovery = ac

首先是初始化了一份?PushContext?,創建?PushContext?所需的各種列表和?Map?。其次是初始化了一個聚合所有注冊中心的?Controller?作為?Environment?中的?ServiceDiscovery?。該?Controller?提供從所有注冊中心(如?Kubernetes, Consul, MCP?等)獲取服務和實例列表的方法。這里傳入了一個參數?MeshHolder?是想利用?Environment?中的?mesh.Watcher?將?mesh?這個配置同步過去。

初始化?Server

Server?的結構之前分析過,這里將之前初始化的?Environment?傳入后,開始初始化?XDSServer?。

s := &Server{clusterID: getClusterID(args),environment: e,XDSServer: xds.NewDiscoveryServer(e, args.Plugins), // 初始化 XDSServerfileWatcher: filewatcher.NewWatcher(),httpMux: http.NewServeMux(),monitoringMux: http.NewServeMux(),readinessProbes: make(map[string]readinessProbe), }

XDSServer?相關的代碼在?istio/pilot/pkg/xds/discovery.go?中,對應為?DiscoveryServer?,該服務為?Envoy xDS APIs的?gRPC?實現。?DiscoveryServer?關鍵定義如下:

type DiscoveryServer struct {Env *model.Environment // 即上述 pilot server 中的 EnvironmentConfigGenerator core.ConfigGenerator // 控制面 Istio 配置的生成器,如 VirtualService 等Generators map[string]model.XdsResourceGenerator // 針對不同配置類型的定制化生成器concurrentPushLimit chan struct{}// 不同服務所有實例的集合,增量更新,key 為 service 和 namespace// EndpointShards 中是以不同的注冊中心名為 key 分組保存實例EndpointShardsByService map[string]map[string]*EndpointShards pushChannel chan *model.PushRequest // 接收 push 請求的 channelpushQueue *PushQueue // 防抖之后,真正 Push xDS 之前所用的緩沖隊列adsClients map[string]*Connection // ADS 和 EDS 的 gRPC 連接StatusReporter DistributionStatusCache // 監聽 xDS ACK 和連接斷開// xDS 狀態更新的生成器(更新 connect, disconnect, nacks, acks)// 狀態更新后向所有 connection 推送 DiscoveryResponseInternalGen *InternalGen serverReady bool // 表示緩存已同步,server 可以接受請求debounceOptions debounceOptions // 防抖設置cache Cache // xDS 資源的緩存,目前僅適用于 EDS,線程安全 }

初始化?MeshConfig?、?KubeClient?、?MeshNetworks?和?MeshHandlers

s.initMeshConfiguration(args, s.fileWatcher) if err := s.initKubeClient(args); err != nil {return nil, fmt.Errorf("error initializing kube client: %v", err) } s.initMeshNetworks(args, s.fileWatcher) s.initMeshHandlers()

這幾個初始化函數比較好理解,?initMeshConfiguration?和?initMeshNetworks?都是通過?fileWatcher?對?istiod?從?configmap?中掛載的兩個配置文件?mesh?和?meshNetworks?進行監聽。當配置文件發生變化時重載配置并觸發相應的?Handlers?。

filewatcher?的代碼在另一個管理通用工具包的項目里:?github.com/istio/pkg/filewatcher?,感興趣的同學可以再詳細研究下,底層使用到了?fsnotify[3]?這個庫來推送文件變化事件。

initMeshHandlers?為上述兩個配置文件注冊了兩個?Handler?,當配置文件發生變化時觸發全量?xDS?下發。

初始化?Controllers

這部分比較核心,初始化了三種控制器分別處理證書、配置信息和注冊信息,證書及安全相關的內容本篇先暫不討論。主要來看?initConfigController?和?initServiceControllers?。

func (s *Server) initControllers(args *PilotArgs) error {log.Info("initializing controllers")if err := s.initCertController(args); err != nil {return fmt.Errorf("error initializing certificate controller: %v", err)}if err := s.initConfigController(args); err != nil {return fmt.Errorf("error initializing config controller: %v", err)}if err := s.initServiceControllers(args); err != nil {return fmt.Errorf("error initializing service controllers: %v", err)}return nil }

配置信息大都是?Istio?定義的一系列?CRD(如?VirtualService?、?DestinationRules?等),一個控制面可以通過?MCP?同時接入多個?Kubernetes?之外的配置數據源,也可通過文件目錄(主要用來調試)掛載,默認是讀取 Kubernetes 中的配置數據:

func (s *Server) initK8SConfigStore(args *PilotArgs) error {configController, err := s.makeKubeConfigController(args)...s.initStatusController(args, features.EnableStatus) // 初始化上面提到的 StatusReporterreturn nil }

配置數據包括以下類型,具體每個類型的含義?Istio?官網都有介紹及用例,這里不再贅述:

// PilotServiceApi contains only collections used by Pilot, including experimental Service Api. PilotServiceApi = collection.NewSchemasBuilder().MustAdd(IstioNetworkingV1Alpha3Destinationrules).MustAdd(IstioNetworkingV1Alpha3Envoyfilters).MustAdd(IstioNetworkingV1Alpha3Gateways).MustAdd(IstioNetworkingV1Alpha3Serviceentries).MustAdd(IstioNetworkingV1Alpha3Sidecars).MustAdd(IstioNetworkingV1Alpha3Virtualservices).MustAdd(IstioNetworkingV1Alpha3Workloadentries).MustAdd(IstioNetworkingV1Alpha3Workloadgroups).MustAdd(IstioSecurityV1Beta1Authorizationpolicies).MustAdd(IstioSecurityV1Beta1Peerauthentications).MustAdd(IstioSecurityV1Beta1Requestauthentications).MustAdd(K8SServiceApisV1Alpha1Gatewayclasses).MustAdd(K8SServiceApisV1Alpha1Gateways).MustAdd(K8SServiceApisV1Alpha1Httproutes).MustAdd(K8SServiceApisV1Alpha1Tcproutes).Build()

詳細看下?initK8SConfigStore?中的?makeKubeConfigController?方法,這里初始化了一個處理?Istio CRDs?的?Client?,實現?ConfigStoreCache?這個接口中增刪改查等方法。

func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {c, err := crdclient.New(s.kubeClient, buildLedger(args.RegistryOptions), args.Revision, args.RegistryOptions.KubeOptions)if err != nil {return nil, err}return c, nil }

Client?定義如下:

type Client struct {schemas collection.Schemas // Istio CRDs shemasdomainSuffix stringconfigLedger ledger.Ledgerrevision stringkinds map[resource.GroupVersionKind]*cacheHandler // 跟蹤已知類型的所有緩存 handlerqueue queue.InstanceistioClient istioclient.InterfaceserviceApisClient serviceapisclient.Interface }

再依次對這些類型創建?Informer?開啟監聽。回到?initConfigController?,創建好?ConfigStore?之后,再對其進一步包裝:

// 將所有 ConfigStore 聚合并緩存 aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores) // 通過 s.configController 統一操作上面聚合的 ConfigStores s.configController = aggregateConfigController // 將其包裝為 IstioConfigStore 傳入 environment,便于操作 ServiceEntry/Gateway 等資源 // IstioConfigStore 會在之后的 ServiceEntryStore 中用到 s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)

最后將該?Controller?的啟動函數注冊到?startFuncs?中:

s.addStartFunc(func(stop <-chan struct{}) error {go s.configController.Run(stop)return nil })

再來看?initServiceControllers?處理服務發現的?Controller?初始化:

func (s *Server) initServiceControllers(args *PilotArgs) error {serviceControllers := s.ServiceController()for _, r := range args.RegistryOptions.Registries {// ...switch serviceRegistry {case serviceregistry.Kubernetes:if err := s.initKubeRegistry(serviceControllers, args); err != nil {return err}// ...}// ... }

從之前初始化的?environment.ServiceDiscovery?中獲取已注冊的服務中心,如果是?Kubernetes?則執行?initKubeRegistry:

// initKubeRegistry creates all the k8s service controllers under this pilot func (s *Server) initKubeRegistry(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) {// ...log.Infof("Initializing Kubernetes service registry %q", args.RegistryOptions.KubeOptions.ClusterID)kubeRegistry := kubecontroller.NewController(s.kubeClient, args.RegistryOptions.KubeOptions)s.kubeRegistry = kubeRegistryserviceControllers.AddRegistry(kubeRegistry)return }

進一步初始化?Kubernetes?注冊中心,方法為?NewController?,先看一下這個?Controller?的結構:

type Controller struct {client kubernetes.Interfacequeue queue.InstanceserviceInformer cache.SharedIndexInformerserviceLister listerv1.ServiceListerendpoints kubeEndpointsControllernodeInformer cache.SharedIndexInformernodeLister listerv1.NodeListerpods *PodCachemetrics model.MetricsnetworksWatcher mesh.NetworksWatcherxdsUpdater model.XDSUpdaterdomainSuffix stringclusterID stringserviceHandlers []func(*model.Service, model.Event)instanceHandlers []func(*model.ServiceInstance, model.Event)workloadHandlers []func(*model.WorkloadInstance, model.Event)sync.RWMutexservicesMap map[host.Name]*model.ServicenodeSelectorsForServices map[host.Name]labels.InstancenodeInfoMap map[string]kubernetesNodeexternalNameSvcInstanceMap map[host.Name][]*model.ServiceInstanceworkloadInstancesByIP map[string]*model.WorkloadInstanceranger cidranger.RangernetworkForRegistry stringonce sync.Once }

可以看到?Controller?對?Services?、?Nodes?、?Pods?等資源各自初始化了?Informer?、 Lister 以及對應的 Map,各類 Handlers 在 Informer 監聽到增刪改查時推送相應的事件到 queue ,再由?onServiceEvent?、?onNodeEvent?、?c.pods.onEvent?中更新對應的 Map 。

回到?initServiceControllers?,初始化完 Kubernetes 注冊中心之后,還需要關注 Kubernetes 集群之外的服務,這些服務基本都是通過?ServiceEntry?注冊到控制面的,所有?ServiceEntry?配置數據目前還都在之前初始化的?configController?配置中心控制器中,這里將?ServiceEntry?數據單獨拎出來初始化一個?ServicEntry?注冊中心,加入到?serviceControllers?中:

s.serviceEntryStore = serviceentry.NewServiceDiscovery(s.configController, s.environment.IstioConfigStore, s.XDSServer) serviceControllers.AddRegistry(s.serviceEntryStore)

serviceEntryStore?相關的邏輯會在后續 xDS 下發流程的分析中再闡述。

最后將?serviceControllers?中所有的服務注冊中心的?Controller?的啟動函數都注冊到?startFuncs?中:

s.addStartFunc(func(stop <-chan struct{}) error {go serviceControllers.Run(stop)return nil }) // Run starts all the controllers func (c *Controller) Run(stop <-chan struct{}) {for _, r := range c.GetRegistries() {go r.Run(stop)}<-stoplog.Info("Registry Aggregator terminated") }

初始化?RegistryEventHandlers

initRegistryEventHandlers?設置了三個事件處理器?serviceHandler?、?instanceHandler?和?configHandler?分別響應服務、實例和配置數據的更新事件。

serviceHandler?如下:

serviceHandler := func(svc *model.Service, _ model.Event) {pushReq := &model.PushRequest{Full: true,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind: gvk.ServiceEntry,Name: string(svc.Hostname),Namespace: svc.Attributes.Namespace,}: {}},Reason: []model.TriggerReason{model.ServiceUpdate},}s.XDSServer.ConfigUpdate(pushReq) } if err := s.ServiceController().AppendServiceHandler(serviceHandler); err != nil {return fmt.Errorf("append service handler failed: %v", err) }

可以看到當服務本身發生變化時,會觸發?xDS?的全量下發,所有與該服務相關的代理都會收到推送。

實例的變動也會觸發?xDS?的全量下發,不過僅在連接?Consul?時生效。Kubernetes?和?MCP?這兩種服務發現的場景下,更新事件的?Handler?是在別的地方注冊的。

instanceHandler := func(si *model.ServiceInstance, _ model.Event) {// TODO: This is an incomplete code. This code path is called for consul, etc.// In all cases, this is simply an instance update and not a config update. So, we need to update// EDS in all proxies, and do a full config push for the instance that just changed (add/update only).s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind: gvk.ServiceEntry,Name: string(si.Service.Hostname),Namespace: si.Service.Attributes.Namespace,}: {}},Reason: []model.TriggerReason{model.ServiceUpdate},}) } // 跳過 Kubernetes 和 MCP for _, registry := range s.ServiceController().GetRegistries() {// Skip kubernetes and external registries as they are handled separatelyif registry.Provider() == serviceregistry.Kubernetes ||registry.Provider() == serviceregistry.External {continue}if err := registry.AppendInstanceHandler(instanceHandler); err != nil {return fmt.Errorf("append instance handler to registry %s failed: %v", registry.Provider(), err)} }

上一步初始化了?configController?,它操作的對象主要是像?VirtualService?、?DestinationRules?這些?Istio?定義的配置,這些配置的變化也會觸發?xDS?的全量下發,所有與該配置相關的代理都會收到推送。不過?ServiceEntry?和?WorkloadEntry?除外,這兩個資源的配置下發是由?ServiceEntryStore?管理的,之前在初始化?ServiceController?時定義的?s.serviceEntryStore?會處理,之后的篇幅再做詳細介紹。

configHandler := func(_, curr model.Config, event model.Event) {pushReq := &model.PushRequest{Full: true,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind: curr.GroupVersionKind,Name: curr.Name,Namespace: curr.Namespace,}: {}},Reason: []model.TriggerReason{model.ConfigUpdate},}s.EnvoyXdsServer.ConfigUpdate(pushReq) }

下面是跳過?ServiceEntry?和?WorkloadEntry?的代碼:

for _, schema := range schemas {// This resource type was handled in external/servicediscovery.go, no need to rehandle here.if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind() {continue}if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadentries.Resource().GroupVersionKind() {continue}s.configController.RegisterEventHandler(schema.Resource().GroupVersionKind(), configHandler) }

初始化?DiscoveryService

func (s *Server) initDiscoveryService(args *PilotArgs) error {log.Infof("starting discovery service")// Implement EnvoyXdsServer grace shutdowns.addStartFunc(func(stop <-chan struct{}) error {s.EnvoyXdsServer.Start(stop)return nil})s.initGrpcServer(args.KeepaliveOptions)grpcListener, err := net.Listen("tcp", args.ServerOptions.GRPCAddr)if err != nil {return err}s.GRPCListener = grpcListenerreturn nil }

這里將?EnvoyXdsServer?的啟動添加至?startFuncs?中,便于后續統一啟動。并初始化?gRPC?服務器,監聽對應的端口。

初始化?gRPC?服務器,并注冊?xDS V2?和?xDS V3?的?ADS?服務到?gRPC?服務器上:

func (s *Server) initGrpcServer(options *istiokeepalive.Options) {grpcOptions := s.grpcServerOptions(options)s.grpcServer = grpc.NewServer(grpcOptions...)s.EnvoyXdsServer.Register(s.grpcServer)reflection.Register(s.grpcServer) } func (s *DiscoveryServer) Register(rpcs *grpc.Server) {// Register v2 and v3 serversdiscovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)discoveryv2.RegisterAggregatedDiscoveryServiceServer(rpcs, s.createV2Adapter()) }

可以看到?ADS?的?gRPC?服務包含兩個流式方法,一個是全量推送,一個是增量推送。

var _AggregatedDiscoveryService_serviceDesc = grpc.ServiceDesc{ServiceName: "envoy.service.discovery.v3.AggregatedDiscoveryService",HandlerType: (*AggregatedDiscoveryServiceServer)(nil),Methods: []grpc.MethodDesc{},Streams: []grpc.StreamDesc{{StreamName: "StreamAggregatedResources",Handler: _AggregatedDiscoveryService_StreamAggregatedResources_Handler,ServerStreams: true,ClientStreams: true,},{StreamName: "DeltaAggregatedResources",Handler: _AggregatedDiscoveryService_DeltaAggregatedResources_Handler,ServerStreams: true,ClientStreams: true,},},Metadata: "envoy/service/discovery/v3/ads.proto", }

注冊?kubeClient.RunAndWait

將?kubeClient.RunAndWait?方法注冊至?startFuncs?中,?RunAndWait?啟動后所有?Informer?將開始緩存,并等待它們同步完成。之所以在最后運行,可以保證所有的?Informer?都已經注冊。

if s.kubeClient != nil {s.addStartFunc(func(stop <-chan struct{}) error {s.kubeClient.RunAndWait(stop)return nil}) }

啟動過程

啟動流程比較簡單,核心是依次啟動初始化過程中注冊到?startFuncs?中的啟動函數:

for _, fn := range s.startFuncs {if err := fn(stop); err != nil {return err} }

然后調用?waitForCache?等待需要監聽資源的?Informer?緩存完畢,完成后開啟?HTTP?服務響應?readiness?事件。

至此?pilot-discovery?的啟動流程就結束了,有了大概了解后,可以大致歸納出整個?Pilot?的接口架構。

接口設計

在接口設計方面,Pilot?主要有兩類接口:一種是?Store?類接口,定義對資源的增刪改查等方法;另一種是?Controller?類接口,定義了?RegisterEventHandler?和?Run?方法。

Store?類接口主要指?ConfigStore?接口,以及它衍生出的?IstioConfigStore,后者操作的對象為?Istio?定義的配置類型,如?VirtualService、ServiceEntry?等。

而?Controller?類接口指基于?ConfigStore?定義的?ConfigStoreCache?接口,這個接口在哪里用到了呢?之前討論初始化流程的時候,分析過?Pilot?的?Server?的結構,其中用到該接口的有如下幾個字段:

type Server struct {configController model.ConfigStoreCacheConfigStores []model.ConfigStoreCacheserviceEntryStore *serviceentry.ServiceEntryStore } type ServiceEntryStore struct {store model.IstioConfigStore }

可以看到?ConfigStores?是存儲所有配置類數據的?Controller?的地方,ConfigStores?都是在哪里添加的呢?之前分析?initConfigController?方法中提到過,可以再對照代碼看一下調用的地方:

image.png

都添加完畢后,會把這些?ConfigStoreCache?都聚合到?Server.configController?中統一處理。

// Wrap the config controller with a cache.aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)if err != nil {return err}s.configController = aggregateConfigController

而?ServiceEntryStore?中用到的?IstioConfigStore?也是在這里得到的:

s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)

以上,當服務啟動后,會逐個調用這些?ConfigStoreCache?中的?Run?方法處理資源的增刪改事件。

總結

pilot-discovery?的啟動流程初看是比較復雜,但理清楚中間核心的步驟后結構也比較清晰。有了本篇的介紹,之后再走讀幾遍代碼,相信就能很好的掌握?pilot-discovery?初始化的流程。

Pilot?源碼分析的第一部分就到這里,后續會針對重要的組件和接口做更細致的分析,如?EnvoyXdsServer?、ServiceEntryStore?等,以及梳理?xDS?協議的生成和下發流程,會比?pilot-discovery?的啟動流程復雜的多,敬請期待。

參考

?Istio Pilot 代碼深度解析 - 趙化冰[4]

引用鏈接

[1]?基礎概念及名詞:?https://istio.io/latest/zh/docs/concepts/traffic-management/
[2]?ServiceEntry:?https://istio.io/latest/docs/reference/config/networking/service-entry/
[3]?fsnotify:?https://github.com/fsnotify/fsnotify
[4]?Istio Pilot 代碼深度解析 - 趙化冰:?https://zhaohuabing.com/post/2019-10-21-pilot-discovery-code-analysis/

總結

以上是生活随笔為你收集整理的Istio Pilot 源码分析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 日本r级电影在线观看 | 欧美一区二区三区 | 婷婷免费 | 国产午夜手机精彩视频 | 看黄色大片 | 91插插插插插插插 | 好看的av网址 | 欧美成网| 日韩三级视频在线播放 | 国产综合视频一区二区 | 亚洲欧美综合 | 星空无限mv国产剧入选 | 人人草超碰 | 国产青青草 | 成熟女人毛片www免费版在线 | 天堂中文在线最新 | 日韩精品电影在线 | 37p粉嫩大胆色噜噜噜 | 久久久青| 亚洲第一区av | 国产精品久久久久久久午夜 | 污网站免费在线 | av网站在线免费看 | 国产日韩精品久久 | 午夜影视av | 一区二区国产欧美 | 久草精品在线观看 | 午夜一区| 亚洲av永久无码国产精品久久 | 超碰在线98| 可以免费看污视频的网站 | 欧美色图首页 | 国产精品一级二级三级 | 色综合99久久久无码国产精品 | 中国色老太hd| 91夜色 | 九色视频在线观看 | 日韩精品久久久久久 | 日韩色黄大片 | 解开人妻的裙子猛烈进入 | 波多野结衣毛片 | 欧美日韩亚洲不卡 | 男人的天堂网av | 丝袜 中出 制服 人妻 美腿 | h片在线观看网站 | 97伊人超碰 | 免费人成在线观看网站 | 国模杨依粉嫩蝴蝶150p | 医生强烈淫药h调教小说视频 | 伊人毛片 | 日韩美女视频网站 | 制服丝袜中文字幕在线 | 精品97人妻无码中文永久在线 | 日韩性xxxx | 国产91精品久久久久久久 | 中日韩欧美在线观看 | 国产吃瓜黑料一区二区 | 亚洲精品美女在线观看 | 蜜臀尤物一区二区三区直播 | 日本国产一区二区 | 国产99对白在线播放 | 日韩三级视频在线观看 | 欧美色妞网 | 婷婷丁香社区 | 美日韩一二三区 | 老头巨大又粗又长xxxxx | 午夜欧美日韩 | 日本一级做a爱片 | 免费精品国产 | 国产日韩在线视频 | 丰满少妇在线观看资源站 | 久久午夜网站 | 日韩大片av| 欧洲熟妇的性久久久久久 | 寻找身体恐怖电影免费播放 | 人人妻人人澡人人爽人人精品 | 国产激情无套内精对白视频 | 一区二区福利 | 亚洲大尺度在线观看 | 久久狠 | 成人免费一级 | 男人天堂新地址 | 男女又爽又黄 | 综合久久久久久久 | 激情五月深爱五月 | 不卡视频国产 | 成人h在线观看 | 一级香蕉视频在线观看 | 亚洲男人天堂影院 | 伊人网伊人影院 | 亚洲free性xxxx护士hd | 欧美日韩精品在线 | 少妇被爽到高潮动态图 | xxxxx色 | 老司机精品视频在线播放 | 国产精品久免费的黄网站 | 色成人综合 | 免费在线观看中文字幕 | 蜜桃成熟时李丽珍国语 |