透过 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作机制
In-memory Channel是當(dāng)前Knative Eventing中默認(rèn)的Channel, 也是一般剛接觸Knative Eventing首先了解到的Channel。本文通過(guò)分析 In-memory Channel 來(lái)進(jìn)一步了解 Knative Eventing 中Broker/Trigger事件處理機(jī)制。
事件處理概覽
我們先整體看一下Knative Eventing 工作機(jī)制示意圖:
通過(guò) namespace 創(chuàng)建默認(rèn) Broker 如果不指定Channel,會(huì)使用默認(rèn)的 Inmemory Channel。
下面我們從數(shù)據(jù)平面開(kāi)始分析Event事件是如何通過(guò)In-memory Channel分發(fā)到Knative Service
Ingress
Ingress是事件進(jìn)入Channel前的第一級(jí)過(guò)濾,但目前的功能僅僅是接收事件然后轉(zhuǎn)發(fā)到Channel。過(guò)濾功能處理TODO狀態(tài)。
func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {tctx := cloudevents.HTTPTransportContextFrom(ctx)if tctx.Method != http.MethodPost {resp.Status = http.StatusMethodNotAllowedreturn nil}// tctx.URI is actually the path...if tctx.URI != "/" {resp.Status = http.StatusNotFoundreturn nil}ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName))defer func() {stats.Record(ctx, MeasureEventsTotal.M(1))}()send := h.decrementTTL(&event)if !send {ctx, _ = tag.New(ctx, tag.Insert(TagResult, "droppedDueToTTL"))return nil}// TODO Filter.ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched"))return h.sendEvent(ctx, tctx, event) }In-memory Channel
Broker 字面意思為代理者,那么它代理的是誰(shuí)呢?是Channel。為什么要代理Channel呢,而不直接發(fā)給訪問(wèn)Channel。這個(gè)其實(shí)涉及到Broker/Trigger設(shè)計(jì)的初衷:對(duì)事件過(guò)濾處理。我們知道Channel(消息通道)負(fù)責(zé)事件傳遞,Subscription(訂閱)負(fù)責(zé)訂閱事件,通常這二者的模型如下:
這里就涉及到消息隊(duì)列和訂閱分發(fā)的實(shí)現(xiàn)。那么在In-memory Channel中如何實(shí)現(xiàn)的呢?
其實(shí) In-memory 的核心處理在Fanout Handler中,它負(fù)責(zé)將接收到的事件分發(fā)到不同的 Subscription。
In-memory Channel處理示意圖:
事件接收并分發(fā)核心代碼如下:
func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {return func(_ provisioners.ChannelReference, m *provisioners.Message) error {if f.config.AsyncHandler {go func() {// Any returned error is already logged in f.dispatch()._ = f.dispatch(m)}()return nil}return f.dispatch(m)} }當(dāng)前分發(fā)機(jī)制默認(rèn)是異步機(jī)制(可通過(guò)AsyncHandler參數(shù)控制分發(fā)機(jī)制)。
消息分發(fā)機(jī)制:
// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out // requests return successfully, then return nil. Else, return an error. func (f *Handler) dispatch(msg *provisioners.Message) error {errorCh := make(chan error, len(f.config.Subscriptions))for _, sub := range f.config.Subscriptions {go func(s eventingduck.SubscriberSpec) {errorCh <- f.makeFanoutRequest(*msg, s)}(sub)}for range f.config.Subscriptions {select {case err := <-errorCh:if err != nil {f.logger.Error("Fanout had an error", zap.Error(err))return err}case <-time.After(f.timeout):f.logger.Error("Fanout timed out")return errors.New("fanout timed out")}}// All Subscriptions returned err = nil.return nil }通過(guò)這里的代碼,我們可以看到分發(fā)處理超時(shí)機(jī)制。默認(rèn)為60s。也就是說(shuō)如果分發(fā)的請(qǐng)求響應(yīng)超過(guò)60s,那么In-memory會(huì)報(bào)錯(cuò):Fanout timed out。
Filter
一般的消息分發(fā)會(huì)將消息發(fā)送給訂閱的服務(wù),但在 Broker/Trigger 模型中需要對(duì)事件進(jìn)行過(guò)濾處理,這個(gè)處理的地方就是在Filter 中。
- 根據(jù)請(qǐng)求獲取Trigger信息。Filter中會(huì)根據(jù)請(qǐng)求的信息拿到 Trigger 名稱,然后通過(guò)獲取Trigger對(duì)應(yīng)的資源信息拿到過(guò)濾規(guī)則
- 根據(jù)Trigger 過(guò)濾規(guī)則進(jìn)行事件的過(guò)濾處理
- 最后將滿足過(guò)濾規(guī)則的分發(fā)到Kservice
其中過(guò)濾規(guī)則處理代碼如下:
func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {if ts.Filter == nil || ts.Filter.SourceAndType == nil {r.logger.Error("No filter specified")ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail"))return false}// Record event count and filtering timestartTS := time.Now()defer func() {filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond)stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS))}()filterType := ts.Filter.SourceAndType.Typeif filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() {r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type()))ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))return false}filterSource := ts.Filter.SourceAndType.Sources := event.Context.AsV01().SourceactualSource := s.String()if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource {r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource))ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))return false}ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass"))return true }當(dāng)前的機(jī)制是所有的訂閱事件都會(huì)通過(guò) Filter 集中進(jìn)行事件過(guò)濾,如果一個(gè)Broker有大量的訂閱Trigger,是否會(huì)給Filter帶來(lái)性能上的壓力? 這個(gè)在實(shí)際場(chǎng)景 Broker/Trigger 的運(yùn)用中需要考慮到這個(gè)問(wèn)題。
結(jié)論
作為內(nèi)置的默認(rèn)Channel實(shí)現(xiàn),In-memory 可以說(shuō)很好的完成了事件接收并轉(zhuǎn)發(fā)的使命,并且 Knative Eventing 在接下來(lái)的迭代中會(huì)支持部署時(shí)指定設(shè)置默認(rèn)的Channel。有興趣的同學(xué)可以持續(xù)關(guān)注一下。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的透过 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: C语言动态内存管理和动态内存分配
- 下一篇: Linus 本尊来了!为什么 KubeC