日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

grpc 传递上下文_grpc 源码笔记 02:ClientConn

發布時間:2023/11/27 生活经验 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 grpc 传递上下文_grpc 源码笔记 02:ClientConn 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

上篇筆記中梳理了一把 resolver 和 balancer,這里順著前面的流程走一遍入口的 ClientConn 對象。

ClientConn

// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {ctx    context.Contextcancel context.CancelFunctarget       stringparsedTarget resolver.Targetauthority    stringdopts        dialOptionscsMgr        *connectivityStateManagerbalancerBuildOpts balancer.BuildOptionsblockingpicker    *pickerWrappermu              sync.RWMutexresolverWrapper *ccResolverWrappersc              *ServiceConfigconns           map[*addrConn]struct{}// Keepalive parameter can be updated if a GoAway is received.mkp             keepalive.ClientParameterscurBalancerName stringbalancerWrapper *ccBalancerWrapperretryThrottler  atomic.ValuefirstResolveEvent *grpcsync.EventchannelzID int64 // channelz unique identification numberczData     *channelzData
}

首先是 ctx 和 cancel 兩個字段,之前好像有看到什么最佳實戰說不要把 context 字段放在 struct 里傳遞而要放在 func 里傳遞,但是這里確實屬于一個非常合理的場景:管理連接的生命周期,這個 ctx 和 cancel 都是來自建立連接時的 DialContext,標準庫的 net.Conn 的結構體中也有同樣的兩個字段,這樣請求上下文中建立的連接,可以在請求結束時安全釋放掉。ClientConn 中派生出的 goroutine,也能通過 cancel 函數安全地關閉掉。

target、parsedTarget、authority、dopts 似乎都屬于比較原始的參數。

csMgr 用于管理 ClientConn 總體的連接狀態,先放一下,后面詳細看。

resolverWrapper、conns、curBalancerName、balancerWrapper、firstResolveEvent 跟名字解析、負載均衡相關,上一篇筆記中簡單看過一點。retryThrottler 大約是重試的退避策略,還沒有了解過。

sc *ServiceConfig 是服務端給出的服務參數信息,大約是 maxRequestMessageBytes、timeout 之類的控制信息,可以具體到接口級別。mkp keepalive.ClientParameters 也是參數信息,與 keepalive 相關。

channelzID 和 czData 與 channelz 的信息相關,channelz 是 grpc 內部的一些埋點監控性質的信息,大體上是一個異步的 AddTraceEvent 然后匯聚數值,看代碼的時候應該可以忽略這部分。

ClientConn 與 resolverWrapper / balancerWrapper 的交互

clientConn 與 resolver / balancer 之間的交互在上一篇筆記中簡單梳理過,好處是接口比較明確,所以交互比較清晰。clientConn 與 resolverWrapper / balancerWrapper 之間的交互都是具體的方法,手工梳理一下。

resolverWrapper 對 clientConn 的調用有 updateResolverState。

clientConn 對 resolverWrapper 的調用有 resolveNow。

clientConn 對 balancerWrapper 的調用有:

  • resolveError:調用來自 clientConn 的 updateResolverState 方法,該方法是被 resolverWrapper 所調用的。
  • handleSubConnStateChange,調用來自 clientConn 的 handleSubConnStateChange 方法,該方法又是被 addrConn 的 updateConnectivityState 調用的。
  • updateClientConnState,調用來自 clientConn 的 updateResolverState,用于傳遞名字解析的更新。

balancerWrapper 對 clientConn 的調用有:

  • newAddrConn、removeAddrConn:大體上與 NewSubConn 和 RemoveSubConn 相映射,addrConn 是具體的 SubConn 的實現。
  • blockingPicker.updatePicker、csMgr.updateState:皆在 UpdateBalancerState 時調用,將 balancer.State 中的 picker 與總連接狀態設置給 clientConn。
  • resolveNow:來自 ResolveNow,向 clientConn 發起 resolver 的解析。

畫一張圖:

交互的過程感覺有點像 k8s 那種偵聽結構體的字段變動做收斂邏輯的意思,比如 resolver 給出后端地址、ServiceConfig、附加元信息的 State 結構體,ClientConn 跟 balancer 都拿這一個結構體中自己關心的字段做自己的邏輯,整個流程都異步做。

這張圖里只有 handleSubConnStateChange 的來源沒標注。它是來自 addrConn 的回調,后面再展開梳理。

ClientConn 的初始化

名字解析與負載均衡都是持續動態刷新的過程,那么整個流程是怎樣啟動的?裁剪一下 DialContext 函數:

// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
//
// The target name syntax is defined in
// <https://github.com/grpc/grpc/blob/master/doc/naming.md>.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target:            target,csMgr:             &connectivityStateManager{},conns:             make(map[*addrConn]struct{}),dopts:             defaultDialOptions(),blockingpicker:    newPickerWrapper(),czData:            new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}cc.retryThrottler.Store((*retryThrottler)(nil))cc.ctx, cc.cancel = context.WithCancel(context.Background())for _, opt := range opts {opt.apply(&cc.dopts)}// 好像是初始化什么鉤子chainUnaryClientInterceptors(cc)chainStreamClientInterceptors(cc)defer func() {if err != nil {cc.Close()}}()if channelz.IsOn() {// ... 初始化 channelz}if !cc.dopts.insecure {// ... tlz 相關參數檢查}if cc.dopts.defaultServiceConfigRawJSON != nil {// ... 解析參數指定的默認 ServiceConfig 的 JSON}cc.mkp = cc.dopts.copts.KeepaliveParamsif cc.dopts.copts.Dialer == nil {// ... 默認 Dialer 函數}if cc.dopts.copts.UserAgent != "" {cc.dopts.copts.UserAgent += " " + grpcUA} else {cc.dopts.copts.UserAgent = grpcUA}// 配置 Dial 的超時if cc.dopts.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)defer cancel()}// 退出函數時,如果 DialContext 的 ctx 如果中途撤銷或者超時了,則返回 ctx.Err()defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}}()// 從 scChan 中偵聽接收 serviceConfig 信息scSet := falseif cc.dopts.scChan != nil {// Try to get an initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &scscSet = true}default:}}// 默認取指數退避if cc.dopts.bs == nil {cc.dopts.bs = backoff.DefaultExponential}// 根據名字的 Scheme 選擇 resolverBuilder// Determine the resolver to use.cc.parsedTarget = parseTarget(cc.target)grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)if resolverBuilder == nil {// .. 如果沒有找到則按默認的 resolverBuilder}creds := cc.dopts.copts.TransportCredentials// ..  初始化 cc.authority// 阻塞等待 scChanif cc.dopts.scChan != nil && !scSet {// Blocking wait for the initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &sc}case <-ctx.Done():return nil, ctx.Err()}}if cc.dopts.scChan != nil {go cc.scWatcher()}// 初始化 balancervar credsClone credentials.TransportCredentialsif creds := cc.dopts.copts.TransportCredentials; creds != nil {credsClone = creds.Clone()}cc.balancerBuildOpts = balancer.BuildOptions{DialCreds:        credsClone,CredsBundle:      cc.dopts.copts.CredsBundle,Dialer:           cc.dopts.copts.Dialer,ChannelzParentID: cc.channelzID,Target:           cc.parsedTarget,}// Build the resolver.rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}cc.mu.Lock()cc.resolverWrapper = rWrappercc.mu.Unlock()// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}return cc, nil
}

cc.dopts.scChan 這里有一些邏輯,再就是在 dopts.block 時,有主動等連接的邏輯。

順著 cc.dopts.scChan 找過去,發現參數定義的 dialoptions 里面有這一段:

// WithServiceConfig returns a DialOption which has a channel to read the
// service configuration.
//
// Deprecated: service config should be received through name resolver or via
// WithDefaultServiceConfig, as specified at
// <https://github.com/grpc/grpc/blob/master/doc/service_config.md>.  Will be
// removed in a future 1.x release.
func WithServiceConfig(c <-chan ServiceConfig) DialOption {return newFuncDialOption(func(o *dialOptions) {o.scChan = c})
}

說 scChan 這個字段要廢棄了,要么換 WithDefaultServiceConfig 傳一個默認的 json,要么通過 resolver 的 UpdateState 中 State 結構體里的 ServiceConfig 字段去動態拿。

ServiceConfig 比想象中更神通廣大一點,ClientConn 中有個 applyServiceConfigAndBalancer 方法,甚至會根據動態下發的 ServiceConfig 來調用 switchBalancer 動態切換 balancer 策略。

csMgr 與 WaitForStateChange

回去單獨看一下 cc.dopts.block 的邏輯:

// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}

大約是一個死循環連接狀態直到 Ready 為止,ClientConn 的連接狀態來自 cc.csMgr 做管理,而 csMgr 中的連接狀態來自 balancer 對 ClientConn 的 UpdateState 的回調。balancer 的連接狀態是對多個連接的連接狀態的匯聚,大約是只要有一個連接 Ready,便將 balancer 的連接狀態視為 Ready。之前看 balancer 做匯聚連接狀態還不大清楚這個的用處,現在看應該主要是為 WaitForStateChange 這個方法服務的,而且這個方法是公共方法,是 ClientConn 的對外 API。

工程上如果開啟 cc.dopts.block,似乎配合一個 cc.dopts.timeout 比較好,這樣能超時退出。

csMgr 主要做的事情是輔助 ClientConn 實現 connectivity.Reporter 接口,尤其是 WaitForStateChange 方法:

// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {mu         sync.Mutexstate      connectivity.StatenotifyChan chan struct{}channelzID int64
}// ...// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state connectivity.State) {csm.mu.Lock()defer csm.mu.Unlock()if csm.state == connectivity.Shutdown {return}if csm.state == state {return}csm.state = stateif channelz.IsOn() {// ...}if csm.notifyChan != nil {// There are other goroutines waiting on this channel.close(csm.notifyChan)csm.notifyChan = nil}
}func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {csm.mu.Lock()defer csm.mu.Unlock()if csm.notifyChan == nil {csm.notifyChan = make(chan struct{})}return csm.notifyChan
}// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {ch := cc.csMgr.getNotifyChan()if cc.csMgr.getState() != sourceState {return true}select {case <-ctx.Done():return falsecase <-ch:return true}
}

notifyChan 這個 channel 僅通過 close 做廣播性的通知。每當 state 狀態變化會惰性產生新的 notifyChan,當這個 notifyChan 被關閉時就意味著狀態有變化了,起到一個類似條件變量的作用。

blockingpicker

除了 balancerWrapper、resolverWrapper,ClientConn 中還有一個 pickerWrapper 類型的 blockingPicker 字段,本體也是同樣主要是并發同步為主。

// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {mu         sync.Mutexdone       boolblockingCh chan struct{}picker     balancer.V2Picker// The latest connection error.  TODO: remove when V1 picker is deprecated;// balancer should be responsible for providing the error.*connErr
}type connErr struct {mu  sync.Mutexerr error
}

大約是初始化時生成一個 blockingCh,隨后每當 updatePickerV2 改動 picker 時,則關閉舊 blockingCh 同時生成一個新的 blockingCh。

pickerWrapper 對外的主要功能入口是 pick 方法,先看它的注釋:

// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
// - the current picker returns ErrNoSubConnAvailable
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {// ...

這些阻塞唯有 balancer 生成新的 picker 對象交給 ClientConn 才能解除。實現風格上,與 WaitForStateChange 類似,每當狀態變化時關閉舊 chan、生成新 chan,上鎖確保狀態變化與更替 chan 兩步操作的原子性,對方阻塞等待 chan 的關閉。

picker.Pick() 方法本身是線程安全的,不是很清楚每個 SubConn 能否被多個 goroutine 使用,后面再確認一下這點。

先看到這里,下面是 addrConn,也就是 SubConn 的實現。

總結

以上是生活随笔為你收集整理的grpc 传递上下文_grpc 源码笔记 02:ClientConn的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。