Dubbo-go 源码笔记(二)客户端调用过程
作者 |?李志信
導讀:有了上一篇文章《Dubbo-go 源碼筆記(一)Server 端開啟服務過程》的鋪墊,可以類比客戶端啟動于服務端的啟動過程。其中最大的區別是服務端通過 zk 注冊服務,發布自己的ivkURL并訂閱事件開啟監聽;而客戶應該是通過zk注冊組件,拿到需要調用的serviceURL,更新invoker并重寫用戶的RPCService,從而實現對遠程過程調用細節的封裝。
配置文件和客戶端源代碼
1. client 配置文件
helloworld 提供的 demo:profiles/client.yaml。
registries :"demoZk":protocol: "zookeeper"timeout : "3s"address: "127.0.0.1:2181"username: ""password: "" references:"UserProvider":# 可以指定多個registry,使用逗號隔開;不指定默認向所有注冊中心注冊registry: "demoZk"protocol : "dubbo"interface : "com.ikurento.user.UserProvider"cluster: "failover"methods :- name: "GetUser"retries: 3可看到配置文件與之前討論過的 Server 端非常類似,其 refrences 部分字段就是對當前服務要主調的服務的配置,其中詳細說明了調用協議、注冊協議、接口 id、調用方法、集群策略等,這些配置都會在之后與注冊組件交互、重寫 ivk、調用的過程中使用到。
2. 客戶端使用框架源碼
user.go:
func init() {config.SetConsumerService(userProvider)hessian.RegisterPOJO(&User{}) }main.go:
func main() {hessian.RegisterPOJO(&User{})config.Load()time.Sleep(3e9)println("\n\n\nstart to test dubbo")user := &User{}err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)if err != nil {panic(err)}println("response result: %v\n", user)initSignal() }在官網提供的 helloworld demo 的源碼中,可看到與服務端類似,在 user.go 內注冊了 rpc-service,以及需要 rpc 傳輸的結構體 user。
在 main 函數中,同樣調用了 config.Load() 函數,之后就可以通過實現好的 rpc-service:userProvider 直接調用對應的功能函數,即可實現 rpc 調用。
可以猜到,從 hessian 注冊結構、SetConsumerService,到調用函數 .GetUser() 期間,用戶定義的 rpc-service 也就是 userProvider 對應的函數被重寫,重寫后的 GetUser 函數已經包含實現了遠程調用邏輯的 invoker。
接下來,就要通過閱讀源碼,看看 dubbo-go 是如何做到的。
實現遠程過程調用
1. 加載配置文件
// file: config/config_loader.go :Load()// Load Dubbo Init func Load() {// init routerinitRouter()// init the global event dispatcherextension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)// start the metadata report if config setif err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)return}// reference configloadConsumerConfig()在 main 函數中調用了 config.Load() 函數,進而調用了 loadConsumerConfig,類似于之前講到的 server 端配置讀入函數。
在 loadConsumerConfig 函數中,進行了三步操作:
// config/config_loader.go func loadConsumerConfig() {// 1 init other consumer configconConfigType := consumerConfig.ConfigTypefor key, value := range extension.GetDefaultConfigReader() {}checkApplicationName(consumerConfig.ApplicationConfig)configCenterRefreshConsumer()checkRegistries(consumerConfig.Registries, consumerConfig.Registry)// 2 refer-implement-referencefor key, ref := range consumerConfig.References {if ref.Generic {genericService := NewGenericService(key)SetConsumerService(genericService)}rpcService := GetConsumerService(key)ref.id = keyref.Refer(rpcService)ref.Implement(rpcService)}// 3 wait for invoker is available, if wait over default 3s, then panicfor {} }其中重要的就是 for 循環里面的引用和實例化,兩步操作,會在接下來展開討論。
至此,配置已經被寫入了框架。
2. 獲取遠程 Service URL,實現可供調用的 invoker
上述的 ref.Refer 完成的就是這部分的操作。
圖(一)
1)構造注冊 url
和 server 端類似,存在注冊 url 和服務 url,dubbo 習慣將服務 url 作為注冊 url 的 sub。
// file: config/reference_config.go: Refer() func (c *ReferenceConfig) Refer(_ interface{}) {//(一)配置url參數(serviceUrl),將會作為subcfgURL := common.NewURLWithOptions(common.WithPath(c.id),common.WithProtocol(c.Protocol),common.WithParams(c.getUrlMap()),common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),)...// (二)注冊地址可以通過url格式給定,也可以通過配置格式給定// 這一步的意義就是配置->提取信息生成URLif c.Url != "" {// 用戶給定url信息,可以是點對點的地址,也可以是注冊中心的地址// 1. user specified URL, could be peer-to-peer address, or register center's address.urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*")for _, urlStr := range urlStrings {serviceUrl, err := common.NewURL(urlStr)...}} else {// 配置讀入注冊中心的信息// assemble SubURL from register center's configuration mode// 這是注冊url,protocol = registry,包含了zk的用戶名、密碼、ip等等c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)...// set url to regUrlsfor _, regUrl := range c.urls {regUrl.SubURL = cfgURL// regUrl的subURl存當前配置url}}//至此,無論通過什么形式,已經拿到了全部的regURL// (三)獲取registryProtocol實例,調用其Refer方法,傳入新構建好的regURLif len(c.urls) == 1 {// 這一步訪問到registry/protocol/protocol.go registryProtocol.Refer// 這里是registryc.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])} else {// 如果有多個注冊中心,即有多個invoker,則采取集群策略invokers := make([]protocol.Invoker, 0, len(c.urls))...}這個函數中,已經處理完從 Register 配置到 RegisterURL 的轉換,即圖(一)中部分:
接下來,已經拿到的 url 將被傳遞給 RegistryProtocol,進一步 refer。
2)registryProtocol 獲取到 zkRegistry 實例,進一步 Refer
// file: registry/protocol/protocol.go: Refer// Refer provider service from registry center // 拿到的是配置文件registries的url,他能夠生成一個invoker = 指向目的addr,以供客戶端直接調用。 func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {var registryUrl = url// 這里拿到的是referenceConfig,serviceUrl里面包含了Reference的所有信息,包含interfaceName、method等等var serviceUrl = registryUrl.SubURLif registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = "registry"protocol := registryUrl.GetParam(constant.REGISTRY_KEY, "")registryUrl.Protocol = protocol//替換成了具體的值,比如"zookeeper"}// 接口對象var reg registry.Registry// (一)實例化接口對象,緩存策略if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {// 緩存中不存在當前registry,新建一個regreg = getRegistry(®istryUrl)// 緩存起來proto.registries.Store(registryUrl.Key(), reg)} else {reg = regI.(registry.Registry)}// 到這里,獲取到了reg實例 zookeeper的registry//(二)根據Register的實例zkRegistry和傳入的regURL新建一個directory// 這一步存在復雜的異步邏輯,從注冊中心拿到了目的service的真實addr,獲取了invoker并放入directory,// 這一步將在下面詳細給出步驟// new registry directory for store service url from registrydirectory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg)if err != nil {logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",serviceUrl.String(), err.Error())return nil}// (三)DoRegister 在zk上注冊當前client serviceerr = reg.Register(*serviceUrl)if err != nil {logger.Errorf("consumer service %v register registry %v error, error message is %s",serviceUrl.String(), registryUrl.String(), err.Error())}// (四)new cluster invoker,將directory寫入集群,獲得具有集群策略的invokercluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))invoker := cluster.Join(directory)// invoker保存proto.invokers = append(proto.invokers, invoker)return invoker }可詳細閱讀上述注釋,這個函數完成了從 url 到 invoker 的全部過程:
(一)首先獲得 Registry 對象,默認是之前實例化的 zkRegistry,和之前 server 獲取 Registry 的處理很類似。
(二)通過構造一個新的 directory,異步拿到之前在 zk 上注冊的 server 端信息,生成 invoker。
(三)在 zk 上注冊當前 service。
(四)集群策略,獲得最終 invoker。
這一步完成了圖(一)中所有余下的絕大多數操作,接下來就需要詳細地查看 directory 的構造過程。
3)構造 directory(包含較復雜的異步操作)
圖(二)
上述的 extension.GetDefaultRegistryDirectory(®istryUrl, reg) 函數,本質上調用了已經注冊好的 NewRegistryDirectory 函數:
// file: registry/directory/directory.go: NewRegistryDirectory()// NewRegistryDirectory will create a new RegistryDirectory // 這個函數作為default注冊在extension上面 // url為注冊url,reg為zookeeper registry func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {if url.SubURL == nil {return nil, perrors.Errorf("url is invalid, suburl can not be nil")}dir := &RegistryDirectory{BaseDirectory: directory.NewBaseDirectory(url),cacheInvokers: []protocol.Invoker{},cacheInvokersMap: &sync.Map{},serviceType: url.SubURL.Service(),registry: registry,}dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)go dir.subscribe(url.SubURL)return dir, nil }首先構造了一個注冊 directory,開啟協程調用其 subscribe 函數,傳入 serviceURL。
這個 directory 目前包含了對應的 zkRegistry,以及傳入的 URL,它的 cacheInvokers 部分是空的。
進入 dir.subscribe(url.SubURL) 這個異步函數:
/ file: registry/directory/directory.go: subscribe()// subscribe from registry func (dir *RegistryDirectory) subscribe(url *common.URL) {// 增加兩個監聽,dir.consumerConfigurationListener.addNotifyListener(dir)dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)// subscribe調用dir.registry.Subscribe(url, dir) }重點來了,它調用了 zkRegistry 的 Subscribe 方法,與此同時將自己作為 ConfigListener 傳入。
我認為這種傳入 listener 的設計模式非常值得學習,而且很有 java 的味道。
針對等待 zk 返回訂閱信息這樣的異步操作,需要傳入一個 Listener,這個 Listener 需要實現 Notify 方法,進而在作為參數傳入內部之后,可以被異步地調用 Notify,將內部觸發的異步事件“傳遞出來”,再進一步處理加工。
層層的 Listener 事件鏈,能將傳入的原始 serviceURL 通過 zkConn 發送給 zk 服務,獲取到服務端注冊好的 url 對應的二進制信息。
而 Notify 回調鏈,則將這串 byte[] 一步一步解析、加工;以事件的形式向外傳遞,最終落到 directory 上的時候,已經是成型的 newInvokers 了。
具體細節不再以源碼形式展示,可參照上圖查閱源碼。
至此已經拿到了 server 端注冊好的真實 invoker。
完成了圖(一)中的部分:
4)構造帶有集群策略的 clusterinvoker
經過上述操作,已經拿到了 server 端 Invokers,放入了 directory 的 cacheinvokers 數組里面緩存。
后續的操作對應本文從 url 到 invoker 的過程的最后一步,由 directory 生成帶有特性集群策略的 invoker。
// (四)new cluster invoker,將directory寫入集群,獲得具有集群策略的invokercluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))invoker := cluster.Join(directory) 123Join 函數的實現就是如下函數:
// file: cluster/cluster_impl/failover_cluster_invokers.go: newFailoverClusterInvoker()func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {return &failoverClusterInvoker{baseClusterInvoker: newBaseClusterInvoker(directory),} } 12345dubbo-go 框架默認選擇 failover 策略,既然返回了一個 invoker,我們查看一下 failoverClusterInvoker 的 Invoker 方法,看它是如何將集群策略封裝到 Invoker 函數內部的:
// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()// Invoker 函數 func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {...//調用List方法拿到directory緩存的所有invokersinvokers := invoker.directory.List(invocation)if err := invoker.checkInvokers(invokers, invocation); err != nil {// 檢查是否可以實現調用return &protocol.RPCResult{Err: err}}// 獲取來自用戶方向傳入的methodName := invocation.MethodName()retries := getRetries(invokers, methodName)loadBalance := getLoadBalance(invokers[0], invocation)for i := 0; i <= retries; i++ {// 重要!這里是集群策略的體現,失敗后重試!//Reselect before retry to avoid a change of candidate `invokers`.//NOTE: if `invokers` changed, then `invoked` also lose accuracy.if i > 0 {if err := invoker.checkWhetherDestroyed(); err != nil {return &protocol.RPCResult{Err: err}}invokers = invoker.directory.List(invocation)if err := invoker.checkInvokers(invokers, invocation); err != nil {return &protocol.RPCResult{Err: err}}}// 這里是負載均衡策略的體現!選擇特定ivk進行調用。ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)if ivk == nil {continue}invoked = append(invoked, ivk)//DO INVOKEresult = ivk.Invoke(ctx, invocation)if result.Error() != nil {providers = append(providers, ivk.GetUrl().Key())continue}return result}... }看了很多 Invoke 函數的實現,所有類似的 Invoker 函數都包含兩個方向:一個是用戶方向的 invcation;一個是函數方向的底層 invokers。
而集群策略的 invoke 函數本身作為接線員,把 invocation 一步步解析,根據調用需求和集群策略,選擇特定的 invoker 來執行。
proxy 函數也是這樣,一個是用戶方向的 ins[] reflect.Type, 一個是函數方向的 invoker。
proxy 函數負責將 ins 轉換為 invocation,調用對應 invoker 的 invoker 函數,實現連通。
而出于這樣的設計,可以在一步步 Invoker 封裝的過程中,每個 Invoker 只關心自己負責操作的部分,從而使整個調用棧解耦。
妙啊!!!
至此,我們理解了 failoverClusterInvoker 的 Invoke 函數實現,也正是和這個集群策略 Invoker 被返回,接受來自上方的調用。
已完成圖(一)中的:
5)在 zookeeper 上注冊當前 client
拿到 invokers 后,可以回到這個函數了:
// file: config/refrence_config.go: Refer()if len(c.urls) == 1 {// 這一步訪問到registry/protocol/protocol.go registryProtocol.Referc.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])// (一)拿到了真實的invokers} else {// 如果有多個注冊中心,即有多個invoker,則采取集群策略invokers := make([]protocol.Invoker, 0, len(c.urls))...cluster := extension.GetCluster(hitClu)// If 'zone-aware' policy select, the invoker wrap sequence would be:// ZoneAwareClusterInvoker(StaticDirectory) ->// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invokerc.invoker = cluster.Join(directory.NewStaticDirectory(invokers))}// (二)create proxy,為函數配置代理if c.Async {callback := GetCallback(c.id)c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)} else {// 這里c.invoker已經是目的addr了c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)}我們有了可以打通的 invokers,但還不能直接調用,因為 invoker 的入參是 invocation,而調用函數使用的是具體的參數列表,需要通過一層 proxy 來規范入參和出參。
接下來新建一個默認 proxy,放置在 c.proxy 內,以供后續使用。
至此,完成了圖(一)中最后的操作:
3. 將調用邏輯以代理函數的形式寫入 rpc-service
上面完成了 config.Refer 操作,回到:
config/config_loader.go:?loadConsumerConfig()
下一個重要的函數是 Implement,它的操作較為簡單:旨在使用上面生成的 c.proxy 代理,鏈接用戶自己定義的 rpcService 到 clusterInvoker 的信息傳輸。
函數較長,只選取了重要的部分:
// file: common/proxy/proxy.go: Implement()// Implement // proxy implement // In consumer, RPCService like: // type XxxProvider struct { // Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error // } // Implement 實現的過程,就是proxy根據函數名和返回值,通過調用invoker 構造出擁有遠程調用邏輯的代理函數 // 將當前rpc所有可供調用的函數注冊到proxy.rpc內 func (p *Proxy) Implement(v common.RPCService) {// makeDubboCallProxy 這是一個構造代理函數,這個函數的返回值是func(in []reflect.Value) []reflect.Value 這樣一個函數// 這個被返回的函數是請求實現的載體,由他來發起調用獲取結果makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {return func(in []reflect.Value) []reflect.Value {// 根據methodName和outs的類型,構造這樣一個函數,這個函數能將in 輸入的value轉換為輸出的value// 這個函數具體的實現如下:...// 目前拿到了 methodName、所有入參的interface和value,出參數reply// (一)根據這些生成一個 rpcinvocationinv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),invocation_impl.WithArguments(inIArr),invocation_impl.WithReply(reply.Interface()),invocation_impl.WithCallBack(p.callBack),invocation_impl.WithParameterValues(inVArr))for k, value := range p.attachments {inv.SetAttachments(k, value)}// add user setAttachmentatm := invCtx.Value(constant.AttachmentKey) // 如果傳入的ctx里面有attachment,也要寫入invif m, ok := atm.(map[string]string); ok {for k, value := range m {inv.SetAttachments(k, value)}}// 至此構造inv完畢// (二)觸發Invoker 之前已經將cluster_invoker放入proxy,使用Invoke方法,通過getty遠程過程調用result := p.invoke.Invoke(invCtx, inv)// 如果有attachment,則加入if len(result.Attachments()) > 0 {invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())}...}}numField := valueOfElem.NumField()for i := 0; i < numField; i++ {t := typeOf.Field(i)methodName := t.Tag.Get("dubbo")if methodName == "" {methodName = t.Name}f := valueOfElem.Field(i)if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // 針對于每個函數outNum := t.Type.NumOut()// 規定函數輸出只能有1/2個if outNum != 1 && outNum != 2 {logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",t.Name, t.Type.String(), outNum)continue}// The latest return type of the method must be error.// 規定最后一個返回值一定是errorif returnType := t.Type.Out(outNum - 1); returnType != typError {logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name)continue}// 獲取到所有的出參類型,放到數組里var funcOuts = make([]reflect.Type, outNum)for i := 0; i < outNum; i++ {funcOuts[i] = t.Type.Out(i)}// do method proxy here:// (三)調用make函數,傳入函數名和返回值,獲得能調用遠程的proxy,將這個proxy替換掉原來的函數位置f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))logger.Debugf("set method [%s]", methodName)}}... }正如之前所說,proxy 的作用是將用戶定義的函數參數列表,轉化為抽象的 invocation 傳入 Invoker,進行調用。
其中已標明有三處較為重要的地方:
至此,也就解決了一開始的問題:
// file: client.go: main()config.Load()user := &User{}err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)這里直接調用用戶定義的 rpcService 的函數 GetUser,此處實際調用的是經過重寫入的函數代理,所以就能實現遠程調用了。
從 client 到 server 的 invoker 嵌套鏈- 小結
在閱讀 dubbo-go 源碼的過程中,我們能夠發現一條清晰的 invoker-proxy 嵌套鏈,希望能夠通過圖的形式來展現:
如果你有任何疑問,歡迎釘釘掃碼加入釘釘交流群:釘釘群號 23331795。
作者簡介
李志信?(GitHubID LaurenceLiZhixin),中山大學軟件工程專業在校學生,擅長使用 Java/Go 語言,專注于云原生和微服務等技術方向。
“阿里巴巴云原生關注微服務、Serverless、容器、Service Mesh 等技術領域、聚焦云原生流行技術趨勢、云原生大規模的落地實踐,做最懂云原生開發者的公眾號?!?/p> 《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀
總結
以上是生活随笔為你收集整理的Dubbo-go 源码笔记(二)客户端调用过程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微服务框架 Go-Micro 集成 Na
- 下一篇: 阿里云 Serverless 再升级,从