日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

透过 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作机制

發(fā)布時(shí)間:2024/8/23 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 透过 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作机制 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

In-memory Channel是當(dāng)前Knative Eventing中默認(rèn)的Channel, 也是一般剛接觸Knative Eventing首先了解到的Channel。本文通過(guò)分析 In-memory Channel 來(lái)進(jìn)一步了解 Knative Eventing 中Broker/Trigger事件處理機(jī)制。

事件處理概覽

我們先整體看一下Knative Eventing 工作機(jī)制示意圖:

通過(guò) namespace 創(chuàng)建默認(rèn) Broker 如果不指定Channel,會(huì)使用默認(rèn)的 Inmemory Channel。

下面我們從數(shù)據(jù)平面開(kāi)始分析Event事件是如何通過(guò)In-memory Channel分發(fā)到Knative Service

Ingress

Ingress是事件進(jìn)入Channel前的第一級(jí)過(guò)濾,但目前的功能僅僅是接收事件然后轉(zhuǎn)發(fā)到Channel。過(guò)濾功能處理TODO狀態(tài)。

func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {tctx := cloudevents.HTTPTransportContextFrom(ctx)if tctx.Method != http.MethodPost {resp.Status = http.StatusMethodNotAllowedreturn nil}// tctx.URI is actually the path...if tctx.URI != "/" {resp.Status = http.StatusNotFoundreturn nil}ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName))defer func() {stats.Record(ctx, MeasureEventsTotal.M(1))}()send := h.decrementTTL(&event)if !send {ctx, _ = tag.New(ctx, tag.Insert(TagResult, "droppedDueToTTL"))return nil}// TODO Filter.ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched"))return h.sendEvent(ctx, tctx, event) }

In-memory Channel

Broker 字面意思為代理者,那么它代理的是誰(shuí)呢?是Channel。為什么要代理Channel呢,而不直接發(fā)給訪問(wèn)Channel。這個(gè)其實(shí)涉及到Broker/Trigger設(shè)計(jì)的初衷:對(duì)事件過(guò)濾處理。我們知道Channel(消息通道)負(fù)責(zé)事件傳遞,Subscription(訂閱)負(fù)責(zé)訂閱事件,通常這二者的模型如下:

這里就涉及到消息隊(duì)列和訂閱分發(fā)的實(shí)現(xiàn)。那么在In-memory Channel中如何實(shí)現(xiàn)的呢?
其實(shí) In-memory 的核心處理在Fanout Handler中,它負(fù)責(zé)將接收到的事件分發(fā)到不同的 Subscription。
In-memory Channel處理示意圖:

事件接收并分發(fā)核心代碼如下:

func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {return func(_ provisioners.ChannelReference, m *provisioners.Message) error {if f.config.AsyncHandler {go func() {// Any returned error is already logged in f.dispatch()._ = f.dispatch(m)}()return nil}return f.dispatch(m)} }

當(dāng)前分發(fā)機(jī)制默認(rèn)是異步機(jī)制(可通過(guò)AsyncHandler參數(shù)控制分發(fā)機(jī)制)。

消息分發(fā)機(jī)制:

// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out // requests return successfully, then return nil. Else, return an error. func (f *Handler) dispatch(msg *provisioners.Message) error {errorCh := make(chan error, len(f.config.Subscriptions))for _, sub := range f.config.Subscriptions {go func(s eventingduck.SubscriberSpec) {errorCh <- f.makeFanoutRequest(*msg, s)}(sub)}for range f.config.Subscriptions {select {case err := <-errorCh:if err != nil {f.logger.Error("Fanout had an error", zap.Error(err))return err}case <-time.After(f.timeout):f.logger.Error("Fanout timed out")return errors.New("fanout timed out")}}// All Subscriptions returned err = nil.return nil }

通過(guò)這里的代碼,我們可以看到分發(fā)處理超時(shí)機(jī)制。默認(rèn)為60s。也就是說(shuō)如果分發(fā)的請(qǐng)求響應(yīng)超過(guò)60s,那么In-memory會(huì)報(bào)錯(cuò):Fanout timed out。

Filter

一般的消息分發(fā)會(huì)將消息發(fā)送給訂閱的服務(wù),但在 Broker/Trigger 模型中需要對(duì)事件進(jìn)行過(guò)濾處理,這個(gè)處理的地方就是在Filter 中。

  • 根據(jù)請(qǐng)求獲取Trigger信息。Filter中會(huì)根據(jù)請(qǐng)求的信息拿到 Trigger 名稱,然后通過(guò)獲取Trigger對(duì)應(yīng)的資源信息拿到過(guò)濾規(guī)則
  • 根據(jù)Trigger 過(guò)濾規(guī)則進(jìn)行事件的過(guò)濾處理
  • 最后將滿足過(guò)濾規(guī)則的分發(fā)到Kservice

其中過(guò)濾規(guī)則處理代碼如下:

func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {if ts.Filter == nil || ts.Filter.SourceAndType == nil {r.logger.Error("No filter specified")ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail"))return false}// Record event count and filtering timestartTS := time.Now()defer func() {filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond)stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS))}()filterType := ts.Filter.SourceAndType.Typeif filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() {r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type()))ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))return false}filterSource := ts.Filter.SourceAndType.Sources := event.Context.AsV01().SourceactualSource := s.String()if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource {r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource))ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))return false}ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass"))return true }

當(dāng)前的機(jī)制是所有的訂閱事件都會(huì)通過(guò) Filter 集中進(jìn)行事件過(guò)濾,如果一個(gè)Broker有大量的訂閱Trigger,是否會(huì)給Filter帶來(lái)性能上的壓力? 這個(gè)在實(shí)際場(chǎng)景 Broker/Trigger 的運(yùn)用中需要考慮到這個(gè)問(wèn)題。

結(jié)論

作為內(nèi)置的默認(rèn)Channel實(shí)現(xiàn),In-memory 可以說(shuō)很好的完成了事件接收并轉(zhuǎn)發(fā)的使命,并且 Knative Eventing 在接下來(lái)的迭代中會(huì)支持部署時(shí)指定設(shè)置默認(rèn)的Channel。有興趣的同學(xué)可以持續(xù)關(guān)注一下。

原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

總結(jié)

以上是生活随笔為你收集整理的透过 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。