日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Knative 驾驭篇:带你 '纵横驰骋' Knative 自动扩缩容实现

發(fā)布時(shí)間:2024/8/23 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Knative 驾驭篇:带你 '纵横驰骋' Knative 自动扩缩容实现 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Knative 中提供了自動擴(kuò)縮容靈活的實(shí)現(xiàn)機(jī)制,本文從?三橫兩縱?的維度帶你深入了解 KPA 自動擴(kuò)縮容的實(shí)現(xiàn)機(jī)制。讓你輕松駕馭 Knative 自動擴(kuò)縮容。
注:本文基于最新 Knative v0.11.0 版本代碼解讀

KPA 實(shí)現(xiàn)流程圖

在 Knative 中,創(chuàng)建一個(gè) Revision 會相應(yīng)的創(chuàng)建 PodAutoScaler 資源。在KPA中通過操作 PodAutoScaler 資源,對當(dāng)前的 Revision 中的 POD 進(jìn)行擴(kuò)縮容。
針對上面的流程實(shí)現(xiàn),我們從三橫兩縱的維度進(jìn)行剖析其實(shí)現(xiàn)機(jī)制。

三橫

  • KPA 控制器
  • 根據(jù)指標(biāo)定時(shí)計(jì)算 POD 數(shù)
  • 指標(biāo)采集

KPA 控制器

通過Revision 創(chuàng)建PodAutoScaler, 在 KPA 控制器中主要包括兩個(gè)資源(Decider 和 Metric)和一個(gè)操作(Scale)。主要代碼如下

func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler) error {......decider, err := c.reconcileDecider(ctx, pa, pa.Status.MetricsServiceName)if err != nil {return fmt.Errorf("error reconciling Decider: %w", err)}if err := c.ReconcileMetric(ctx, pa, pa.Status.MetricsServiceName); err != nil {return fmt.Errorf("error reconciling Metric: %w", err)}// Metrics services are no longer needed as we use the private services now.if err := c.DeleteMetricsServices(ctx, pa); err != nil {return err}// Get the appropriate current scale from the metric, and right size// the scaleTargetRef based on it.want, err := c.scaler.Scale(ctx, pa, sks, decider.Status.DesiredScale)if err != nil {return fmt.Errorf("error scaling target: %w", err)} ...... }

這里先介紹一下兩個(gè)資源:

  • Decider : 擴(kuò)縮容決策的資源,通過Decider獲取擴(kuò)縮容POD數(shù): DesiredScale。
  • Metric:采集指標(biāo)的資源,通過Metric會采集當(dāng)前Revision下的POD指標(biāo)。

再看一下Scale操作,在Scale方法中,根據(jù)擴(kuò)縮容POD數(shù)、最小實(shí)例數(shù)和最大實(shí)例數(shù)確定最終需要擴(kuò)容的POD實(shí)例數(shù),然后修改deployment的Replicas值,最終實(shí)現(xiàn)POD的擴(kuò)縮容, 代碼實(shí)現(xiàn)如下:

// Scale attempts to scale the given PA's target reference to the desired scale. func (ks *scaler) Scale(ctx context.Context, pa *pav1alpha1.PodAutoscaler, sks *nv1a1.ServerlessService, desiredScale int32) (int32, error) { ......min, max := pa.ScaleBounds()if newScale := applyBounds(min, max, desiredScale); newScale != desiredScale {logger.Debugf("Adjusting desiredScale to meet the min and max bounds before applying: %d -> %d", desiredScale, newScale)desiredScale = newScale}desiredScale, shouldApplyScale := ks.handleScaleToZero(ctx, pa, sks, desiredScale)if !shouldApplyScale {return desiredScale, nil}ps, err := resources.GetScaleResource(pa.Namespace, pa.Spec.ScaleTargetRef, ks.psInformerFactory)if err != nil {return desiredScale, fmt.Errorf("failed to get scale target %v: %w", pa.Spec.ScaleTargetRef, err)}currentScale := int32(1)if ps.Spec.Replicas != nil {currentScale = *ps.Spec.Replicas}if desiredScale == currentScale {return desiredScale, nil}logger.Infof("Scaling from %d to %d", currentScale, desiredScale)return ks.applyScale(ctx, pa, desiredScale, ps) }

根據(jù)指標(biāo)定時(shí)計(jì)算 POD 數(shù)

這是一個(gè)關(guān)于Decider的故事。Decider創(chuàng)建之后會同時(shí)創(chuàng)建出來一個(gè)定時(shí)器,該定時(shí)器默認(rèn)每隔 2 秒(可以通過TickInterval 參數(shù)配置)會調(diào)用Scale方法,該Scale方法實(shí)現(xiàn)如下:

func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, excessBC int32, validScale bool) {......metricName := spec.ScalingMetricvar observedStableValue, observedPanicValue float64switch spec.ScalingMetric {case autoscaling.RPS:observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now)a.reporter.ReportStableRPS(observedStableValue)a.reporter.ReportPanicRPS(observedPanicValue)a.reporter.ReportTargetRPS(spec.TargetValue)default:metricName = autoscaling.Concurrency // concurrency is used by defaultobservedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now)a.reporter.ReportStableRequestConcurrency(observedStableValue)a.reporter.ReportPanicRequestConcurrency(observedPanicValue)a.reporter.ReportTargetRequestConcurrency(spec.TargetValue)}// Put the scaling metric to logs.logger = logger.With(zap.String("metric", metricName))if err != nil {if err == ErrNoData {logger.Debug("No data to scale on yet")} else {logger.Errorw("Failed to obtain metrics", zap.Error(err))}return 0, 0, false}// Make sure we don't get stuck with the same number of pods, if the scale up rate// is too conservative and MaxScaleUp*RPC==RPC, so this permits us to grow at least by a single// pod if we need to scale up.// E.g. MSUR=1.1, OCC=3, RPC=2, TV=1 => OCC/TV=3, MSU=2.2 => DSPC=2, while we definitely, need// 3 pods. See the unit test for this scenario in action.maxScaleUp := math.Ceil(spec.MaxScaleUpRate * readyPodsCount)// Same logic, opposite math applies here.maxScaleDown := math.Floor(readyPodsCount / spec.MaxScaleDownRate)dspc := math.Ceil(observedStableValue / spec.TargetValue)dppc := math.Ceil(observedPanicValue / spec.TargetValue)logger.Debugf("DesiredStablePodCount = %0.3f, DesiredPanicPodCount = %0.3f, MaxScaleUp = %0.3f, MaxScaleDown = %0.3f",dspc, dppc, maxScaleUp, maxScaleDown)// We want to keep desired pod count in the [maxScaleDown, maxScaleUp] range.desiredStablePodCount := int32(math.Min(math.Max(dspc, maxScaleDown), maxScaleUp))desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp)) ......return desiredPodCount, excessBC, true }

該方法主要是從 MetricCollector 中獲取指標(biāo)信息,根據(jù)指標(biāo)信息計(jì)算出需要擴(kuò)縮的POD數(shù)。然后設(shè)置在 Decider 中。另外當(dāng) Decider 中 POD 期望值發(fā)生變化時(shí)會觸發(fā) PodAutoscaler 重新調(diào)和的操作,關(guān)鍵代碼如下:

...... if runner.updateLatestScale(desiredScale, excessBC) {m.Inform(metricKey)} ......

在KPA controller中設(shè)置調(diào)和Watch操作:

......// Have the Deciders enqueue the PAs whose decisions have changed.deciders.Watch(impl.EnqueueKey) ......

指標(biāo)采集

通過兩種方式收集POD指標(biāo):

  • PUSH 收集指標(biāo):通過暴露指標(biāo)接口,外部服務(wù)(如Activitor)可以調(diào)用該接口推送 metric 信息
  • PULL 收集指標(biāo):通過調(diào)用 Queue Proxy 服務(wù)接口收集指標(biāo)。

PUSH 收集指標(biāo)實(shí)現(xiàn)比較簡單,在main.go中 暴露服務(wù),將接收到的 metric 推送到 MetricCollector 中:

// Set up a statserver.statsServer := statserver.New(statsServerAddr, statsCh, logger) .... go func() {for sm := range statsCh {collector.Record(sm.Key, sm.Stat)multiScaler.Poke(sm.Key, sm.Stat)}}()

PULL 收集指標(biāo)是如何收集的呢? 還記得上面提到的Metric資源吧,這里接收到Metric資源又會創(chuàng)建出一個(gè)定時(shí)器,這個(gè)定時(shí)器每隔 1 秒會訪問 queue-proxy 9090 端口采集指標(biāo)信息。關(guān)鍵代碼如下:

// newCollection creates a new collection, which uses the given scraper to // collect stats every scrapeTickInterval. func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, logger *zap.SugaredLogger) *collection {c := &collection{metric: metric,concurrencyBuckets: aggregation.NewTimedFloat64Buckets(BucketSize),rpsBuckets: aggregation.NewTimedFloat64Buckets(BucketSize),scraper: scraper,stopCh: make(chan struct{}),}logger = logger.Named("collector").With(zap.String(logkey.Key, fmt.Sprintf("%s/%s", metric.Namespace, metric.Name)))c.grp.Add(1)go func() {defer c.grp.Done()scrapeTicker := time.NewTicker(scrapeTickInterval)for {select {case <-c.stopCh:scrapeTicker.Stop()returncase <-scrapeTicker.C:stat, err := c.getScraper().Scrape()if err != nil {copy := metric.DeepCopy()switch {case err == ErrFailedGetEndpoints:copy.Status.MarkMetricNotReady("NoEndpoints", ErrFailedGetEndpoints.Error())case err == ErrDidNotReceiveStat:copy.Status.MarkMetricFailed("DidNotReceiveStat", ErrDidNotReceiveStat.Error())default:copy.Status.MarkMetricNotReady("CreateOrUpdateFailed", "Collector has failed.")}logger.Errorw("Failed to scrape metrics", zap.Error(err))c.updateMetric(copy)}if stat != emptyStat {c.record(stat)}}}}()return c }

兩縱

  • 0-1 擴(kuò)容
  • 1-N 擴(kuò)縮容

上面從KPA實(shí)現(xiàn)的 3個(gè)橫向角度進(jìn)行了分析,KPA 實(shí)現(xiàn)了0-1擴(kuò)容以及1-N 擴(kuò)縮容,下面我們從這兩個(gè)縱向的角度進(jìn)一步分析。
我們知道,在 Knative 中,流量通過兩種模式到達(dá)POD: Serve 模式和 Proxy 模式。
Proxy 模式: POD數(shù)為 0 時(shí)(另外針對突發(fā)流量的場景也會切換到 Proxy 模式,這里先不做詳細(xì)解讀),切換到 Proxy 模式。
Serve 模式:POD數(shù)不為 0 時(shí),切換成 Serve 模式。
那么在什么時(shí)候進(jìn)行模式的切換呢?在KPA中的代碼實(shí)現(xiàn)如下:

mode := nv1alpha1.SKSOperationModeServe// We put activator in the serving path in the following cases:// 1. The revision is scaled to 0:// a. want == 0// b. want == -1 && PA is inactive (Autoscaler has no previous knowledge of// this revision, e.g. after a restart) but PA status is inactive (it was// already scaled to 0).// 2. The excess burst capacity is negative.if want == 0 || decider.Status.ExcessBurstCapacity < 0 || want == -1 && pa.Status.IsInactive() {logger.Infof("SKS should be in proxy mode: want = %d, ebc = %d, PA Inactive? = %v",want, decider.Status.ExcessBurstCapacity, pa.Status.IsInactive())mode = nv1alpha1.SKSOperationModeProxy}

0-1 擴(kuò)容

第一步:指標(biāo)采集
在POD數(shù)為0時(shí),流量請求模式為Proxy 模式。這時(shí)候流量是通過 Activitor 接管的,在 Activitor 中,會根據(jù)請求數(shù)的指標(biāo)信息,通過WebSockt調(diào)用 KPA中提供的指標(biāo)接口,將指標(biāo)信息發(fā)送給 KPA 中的 MetricCollector。
在 Activitor 中 main 函數(shù)中,訪問 KPA 服務(wù) 代碼實(shí)現(xiàn)如下

// Open a WebSocket connection to the autoscaler.autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s%s", "autoscaler", system.Namespace(), pkgnet.GetClusterDomainName(), autoscalerPort)logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint)statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger)go statReporter(statSink, ctx.Done(), statCh, logger)

通過 WebSockt 發(fā)送請求指標(biāo)代碼實(shí)現(xiàn):

func statReporter(statSink *websocket.ManagedConnection, stopCh <-chan struct{},statChan <-chan []autoscaler.StatMessage, logger *zap.SugaredLogger) {for {select {case sm := <-statChan:go func() {for _, msg := range sm {if err := statSink.Send(msg); err != nil {logger.Errorw("Error while sending stat", zap.Error(err))}}}()case <-stopCh:// It's a sending connection, so no drainage required.statSink.Shutdown()return}} }

第二步:根據(jù)指標(biāo)計(jì)算 POD 數(shù)
在 Scale 方法中,根據(jù) PUSH 獲取的指標(biāo)信息,計(jì)算出期望的POD數(shù)。修改 Decider 期望 POD 值,觸發(fā) PodAutoScaler 重新調(diào)和。
第三步:擴(kuò)容
在KPA controller中,重新執(zhí)行 reconcile 方法,執(zhí)行 scaler 對當(dāng)前Revision進(jìn)行擴(kuò)容操作。然后將流量模式切換成 Server 模式。最終實(shí)現(xiàn) 0-1 的擴(kuò)容操作。

1-N 擴(kuò)縮容

第一步:指標(biāo)采集
在 POD 數(shù)不為0時(shí),流量請求模式為 Server 模式。這時(shí)候會通過PULL 的方式訪問當(dāng)前 revision 中所有 POD queue proxy 9090 端口,拉取業(yè)務(wù)指標(biāo)信息, 訪問服務(wù) URL 代碼實(shí)現(xiàn)如下:

... func urlFromTarget(t, ns string) string {return fmt.Sprintf("http://%s.%s:%d/metrics",t, ns, networking.AutoscalingQueueMetricsPort) }

第二步:根據(jù)指標(biāo)計(jì)算 POD 數(shù)
在 Scale 方法中,根據(jù) PULL 獲取的指標(biāo)信息,計(jì)算出期望的POD數(shù)。修改 Decider 期望 POD 值,觸發(fā) PodAutoScaler 重新調(diào)和。
第三步: 擴(kuò)縮容
在 KPA controller中,重新執(zhí)行 reconcile 方法,執(zhí)行 scaler 對當(dāng)前Revision進(jìn)行擴(kuò)縮容操作。如果縮容為 0 或者觸發(fā)突發(fā)流量場景,則將流量模式切換成 Proxy 模式。最終實(shí)現(xiàn) 1-N 擴(kuò)縮容操作。

總結(jié)

相信通過上面的介紹,對Knative KPA的實(shí)現(xiàn)有了更深入的理解,了解了其實(shí)現(xiàn)原理不僅有助于我們排查相關(guān)的問題,更在于我們可以基于這樣的擴(kuò)縮容機(jī)制實(shí)現(xiàn)自定義的擴(kuò)縮容組件,這也正是 Knative 自動擴(kuò)縮容可擴(kuò)展性靈魂所在。


原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

總結(jié)

以上是生活随笔為你收集整理的Knative 驾驭篇:带你 '纵横驰骋' Knative 自动扩缩容实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。