dubbo源码分析(3)
2019獨角獸企業重金招聘Python工程師標準>>>
繼上一章研究provider的加載過程之后,同理consumer的加載過程基本上和provider過程一模一樣。
同樣也是先讀取consumer.xml文件
<?xml?version="1.0"?encoding="UTF-8"?> <beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"?xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xsi:schemaLocation="http://www.springframework.org/schema/beans?????????http://www.springframework.org/schema/beans/spring-beans.xsd?????????http://code.alibabatech.com/schema/dubbohttp://code.alibabatech.com/schema/dubbo/dubbo.xsd"><dubbo:application?name="zookeeper_privider"?/><dubbo:consumer?timeout="100000"?filter="DataSourceKeyFilter"/><dubbo:registry?protocol="zookeeper"?address="127.0.0.1:2181"?/>?<dubbo:reference?id="xxxService"?interface="com.xxx.xx.service.xxxService"?group="xyx"></dubbo:reference> </beans>Dubbo首先使用com.alibaba.dubbo.config.spring.schema.NamespaceHandler注冊解析器,當spring解析xml配置文件時就會調用這些解析器生成對應的BeanDefinition交給spring管理:
/***?DubboNamespaceHandler*?*?@author?william.liangf*?@export*/ public?class?DubboNamespaceHandler?extends?NamespaceHandlerSupport?{static?{Version.checkDuplicate(DubboNamespaceHandler.class);}public?void?init()?{registerBeanDefinitionParser("application",?new?DubboBeanDefinitionParser(ApplicationConfig.class,?true));registerBeanDefinitionParser("module",?new?DubboBeanDefinitionParser(ModuleConfig.class,?true));registerBeanDefinitionParser("registry",?new?DubboBeanDefinitionParser(RegistryConfig.class,?true));registerBeanDefinitionParser("monitor",?new?DubboBeanDefinitionParser(MonitorConfig.class,?true));registerBeanDefinitionParser("provider",?new?DubboBeanDefinitionParser(ProviderConfig.class,?true));registerBeanDefinitionParser("consumer",?new?DubboBeanDefinitionParser(ConsumerConfig.class,?true));registerBeanDefinitionParser("protocol",?new?DubboBeanDefinitionParser(ProtocolConfig.class,?true));registerBeanDefinitionParser("service",?new?DubboBeanDefinitionParser(ServiceBean.class,?true));registerBeanDefinitionParser("reference",?new?DubboBeanDefinitionParser(ReferenceBean.class,?false));registerBeanDefinitionParser("annotation",?new?DubboBeanDefinitionParser(AnnotationBean.class,?true));}?Spring在初始化IOC容器時會利用這里注冊的BeanDefinitionParser的parse方法獲取對應的ReferenceBean的BeanDefinition實例,由于ReferenceBean實現了InitializingBean接口,在設置了bean的所有屬性后會調用afterPropertiesSet方法:
????public?void?afterPropertiesSet()?throws?Exception?{//如果Consumer還未注冊if?(getConsumer()?==?null)?{//獲取applicationContext這個IOC容器實例中的所有ConsumerConfigMap<String,?ConsumerConfig>?consumerConfigMap?=?applicationContext?==?null???null??:?BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,?ConsumerConfig.class,?false,?false);//如果IOC容器中存在這樣的ConsumerConfigif?(consumerConfigMap?!=?null?&&?consumerConfigMap.size()?>?0)?{ConsumerConfig?consumerConfig?=?null;//遍歷這些ConsumerConfigfor?(ConsumerConfig?config?:?consumerConfigMap.values())?{//如果用戶沒配置Consumer系統會生成一個默認Consumer,且它的isDefault返回ture//這里是說要么是Consumer是默認的要么是用戶配置的Consumer并且沒設置isDefault屬性if?(config.isDefault()?==?null?||?config.isDefault().booleanValue())?{//防止存在兩個默認Consumerif?(consumerConfig?!=?null)?{throw?new?IllegalStateException("Duplicate?consumer?configs:?"?+?consumerConfig?+?"?and?"?+?config);}//獲取默認ConsumerconsumerConfig?=?config;}}if?(consumerConfig?!=?null)?{//設置默認ConsumersetConsumer(consumerConfig);}}}//如果reference未綁定application且(reference未綁定consumer或referenc綁定的consumer沒綁定applicationif?(getApplication()?==?null&&?(getConsumer()?==?null?||?getConsumer().getApplication()?==?null))?{//獲取IOC中所有application的實例Map<String,?ApplicationConfig>?applicationConfigMap?=?applicationContext?==?null???null?:?BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,?ApplicationConfig.class,?false,?false);if?(applicationConfigMap?!=?null?&&?applicationConfigMap.size()?>?0)?{//如果IOC中存在applicationApplicationConfig?applicationConfig?=?null;//遍歷這些applicationfor?(ApplicationConfig?config?:?applicationConfigMap.values())?{//如果application是默認創建或者被指定成默認if?(config.isDefault()?==?null?||?config.isDefault().booleanValue())?{if?(applicationConfig?!=?null)?{throw?new?IllegalStateException("Duplicate?application?configs:?"?+?applicationConfig?+?"?and?"?+?config);}//獲取applicationapplicationConfig?=?config;}}if?(applicationConfig?!=?null)?{//關聯到referencesetApplication(applicationConfig);}}}//如果reference未綁定module且(reference未綁定consumer或referenc綁定的consumer沒綁定moduleif?(getModule()?==?null&&?(getConsumer()?==?null?||?getConsumer().getModule()?==?null))?{//獲取IOC中所有module的實例Map<String,?ModuleConfig>?moduleConfigMap?=?applicationContext?==?null???null?:?BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,?ModuleConfig.class,?false,?false);if?(moduleConfigMap?!=?null?&&?moduleConfigMap.size()?>?0)?{ModuleConfig?moduleConfig?=?null;//遍歷這些modulefor?(ModuleConfig?config?:?moduleConfigMap.values())?{//如果module是默認創建或者被指定成默認if?(config.isDefault()?==?null?||?config.isDefault().booleanValue())?{if?(moduleConfig?!=?null)?{throw?new?IllegalStateException("Duplicate?module?configs:?"?+?moduleConfig?+?"?and?"?+?config);}//獲取modulemoduleConfig?=?config;}}if?(moduleConfig?!=?null)?{//關聯到referencesetModule(moduleConfig);}}}//如果reference未綁定注冊中心(Register)且(reference未綁定consumer或referenc綁定的consumer沒綁定注冊中心(Register)if?((getRegistries()?==?null?||?getRegistries().size()?==?0)&&?(getConsumer()?==?null?||?getConsumer().getRegistries()?==?null?||?getConsumer().getRegistries().size()?==?0)&&?(getApplication()?==?null?||?getApplication().getRegistries()?==?null?||?getApplication().getRegistries().size()?==?0))?{//獲取IOC中所有的注冊中心(Register)實例Map<String,?RegistryConfig>?registryConfigMap?=?applicationContext?==?null???null?:?BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,?RegistryConfig.class,?false,?false);if?(registryConfigMap?!=?null?&&?registryConfigMap.size()?>?0)?{List<RegistryConfig>?registryConfigs?=?new?ArrayList<RegistryConfig>();//遍歷這些registryfor?(RegistryConfig?config?:?registryConfigMap.values())?{//如果registry是默認創建或者被指定成默認if?(config.isDefault()?==?null?||?config.isDefault().booleanValue())?{registryConfigs.add(config);}}if?(registryConfigs?!=?null?&&?registryConfigs.size()?>?0)?{//關聯到reference,此處可以看出一個consumer可以綁定多個registry(注冊中心)super.setRegistries(registryConfigs);}}}//如果reference未綁定監控中心(Monitor)且(reference未綁定consumer或reference綁定的consumer沒綁定監控中心(Monitor)if?(getMonitor()?==?null&&?(getConsumer()?==?null?||?getConsumer().getMonitor()?==?null)&&?(getApplication()?==?null?||?getApplication().getMonitor()?==?null))?{//獲取IOC中所有的監控中心(Monitor)實例Map<String,?MonitorConfig>?monitorConfigMap?=?applicationContext?==?null???null?:?BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,?MonitorConfig.class,?false,?false);if?(monitorConfigMap?!=?null?&&?monitorConfigMap.size()?>?0)?{MonitorConfig?monitorConfig?=?null;//遍歷這些監控中心(Monitor)for?(MonitorConfig?config?:?monitorConfigMap.values())?{//如果monitor是默認創建或者被指定成默認if?(config.isDefault()?==?null?||?config.isDefault().booleanValue())?{if?(monitorConfig?!=?null)?{throw?new?IllegalStateException("Duplicate?monitor?configs:?"?+?monitorConfig?+?"?and?"?+?config);}monitorConfig?=?config;}}if?(monitorConfig?!=?null)?{//關聯到reference,一個consumer綁定到一個監控中心(monitor)setMonitor(monitorConfig);}}}Boolean?b?=?isInit();if?(b?==?null?&&?getConsumer()?!=?null)?{b?=?getConsumer().isInit();}if?(b?!=?null?&&?b.booleanValue())?{//如果consumer已經被關聯則組裝ReferencegetObject();}} }?這步其實是Reference確認生成Invoker所需要的組件是否已經準備好,都準備好后我們進入生成Invoker的部分。這里的getObject會調用父類ReferenceConfig的init方法完成組裝:
首先ReferenceConfig類的init方法調用Protocol的refer方法生成Invoker實例(如上圖中的紅色部分),這是服務消費的關鍵。接下來把Invoker轉換為客戶端需要的接口。詳細代碼如下:
private?void?init()?{//避免重復初始化if?(initialized)?{return;}//置為已經初始化initialized?=?true;//如果interfaceName不存在if?(interfaceName?==?null?||?interfaceName.length()?==?0)?{throw?new?IllegalStateException("<dubbo:reference?interface=\"\"?/>?interface?not?allow?null!");}//?獲取消費者checkDefault();appendProperties(this);//如果未使用泛接口并且consumer已經準備好的情況下,reference使用和consumer一樣的泛接口if?(getGeneric()?==?null?&&?getConsumer()?!=?null)?{setGeneric(getConsumer().getGeneric());}//如果是泛接口那么interface的類型是GenericServiceif?(ProtocolUtils.isGeneric(getGeneric()))?{interfaceClass?=?GenericService.class;}?else?{//如果不是泛接口使用interfaceName指定的泛接口try?{interfaceClass?=?Class.forName(interfaceName,?true,?Thread.currentThread().getContextClassLoader());}?catch?(ClassNotFoundException?e)?{throw?new?IllegalStateException(e.getMessage(),?e);}//檢查接口以及接口中的方法都是否配置齊全checkInterfaceAndMethods(interfaceClass,?methods);}//如果服務比較多可以指定dubbo-resolve.properties文件配置service(service集中配置文件)String?resolve?=?System.getProperty(interfaceName);String?resolveFile?=?null;if?(resolve?==?null?||?resolve.length()?==?0)?{resolveFile?=?System.getProperty("dubbo.resolve.file");if?(resolveFile?==?null?||?resolveFile.length()?==?0)?{File?userResolveFile?=?new?File(new?File(System.getProperty("user.home")),?"dubbo-resolve.properties");if?(userResolveFile.exists())?{resolveFile?=?userResolveFile.getAbsolutePath();}}if?(resolveFile?!=?null?&&?resolveFile.length()?>?0)?{Properties?properties?=?new?Properties();FileInputStream?fis?=?null;try?{fis?=?new?FileInputStream(new?File(resolveFile));properties.load(fis);}?catch?(IOException?e)?{throw?new?IllegalStateException("Unload?"?+?resolveFile?+?",?cause:?"?+?e.getMessage(),?e);}?finally?{try?{if(null?!=?fis)?fis.close();}?catch?(IOException?e)?{logger.warn(e.getMessage(),?e);}}resolve?=?properties.getProperty(interfaceName);}}if?(resolve?!=?null?&&?resolve.length()?>?0)?{url?=?resolve;if?(logger.isWarnEnabled())?{if?(resolveFile?!=?null?&&?resolveFile.length()?>?0)?{logger.warn("Using?default?dubbo?resolve?file?"?+?resolveFile?+?"?replace?"?+?interfaceName?+?""?+?resolve?+?"?to?p2p?invoke?remote?service.");}?else?{logger.warn("Using?-D"?+?interfaceName?+?"="?+?resolve?+?"?to?p2p?invoke?remote?service.");}}}//如果application、module、registries、monitor未配置則使用consumer的if?(consumer?!=?null)?{if?(application?==?null)?{application?=?consumer.getApplication();}if?(module?==?null)?{module?=?consumer.getModule();}if?(registries?==?null)?{registries?=?consumer.getRegistries();}if?(monitor?==?null)?{monitor?=?consumer.getMonitor();}}//如果module已關聯則關聯module的registries和monitorif?(module?!=?null)?{if?(registries?==?null)?{registries?=?module.getRegistries();}if?(monitor?==?null)?{monitor?=?module.getMonitor();}}//如果application已關聯則關聯application的registries和monitorif?(application?!=?null)?{if?(registries?==?null)?{registries?=?application.getRegistries();}if?(monitor?==?null)?{monitor?=?application.getMonitor();}}//檢查applicationcheckApplication();//檢查遠端和本地服務接口真實存在(是否可load)checkStubAndMock(interfaceClass);Map<String,?String>?map?=?new?HashMap<String,?String>();//配置dubbo的端屬性(是consumer還是provider)、版本屬性、創建時間、進程號Map<Object,?Object>?attributes?=?new?HashMap<Object,?Object>();map.put(Constants.SIDE_KEY,?Constants.CONSUMER_SIDE);map.put(Constants.DUBBO_VERSION_KEY,?Version.getVersion());map.put(Constants.TIMESTAMP_KEY,?String.valueOf(System.currentTimeMillis()));if?(ConfigUtils.getPid()?>?0)?{map.put(Constants.PID_KEY,?String.valueOf(ConfigUtils.getPid()));}if?(!?isGeneric())?{String?revision?=?Version.getVersion(interfaceClass,?version);if?(revision?!=?null?&&?revision.length()?>?0)?{map.put("revision",?revision);}String[]?methods?=?Wrapper.getWrapper(interfaceClass).getMethodNames();if(methods.length?==?0)?{logger.warn("NO?method?found?in?service?interface?"?+?interfaceClass.getName());map.put("methods",?Constants.ANY_VALUE);}else?{map.put("methods",?StringUtils.join(new?HashSet<String>(Arrays.asList(methods)),?","));}}map.put(Constants.INTERFACE_KEY,?interfaceName);//調用application、module、consumer的get方法將屬性設置到map中appendParameters(map,?application);appendParameters(map,?module);appendParameters(map,?consumer,?Constants.DEFAULT_KEY);appendParameters(map,?this);String?prifix?=?StringUtils.getServiceKey(map);if?(methods?!=?null?&&?methods.size()?>?0)?{for?(MethodConfig?method?:?methods)?{appendParameters(map,?method,?method.getName());String?retryKey?=?method.getName()?+?".retry";if?(map.containsKey(retryKey))?{String?retryValue?=?map.remove(retryKey);if?("false".equals(retryValue))?{map.put(method.getName()?+?".retries",?"0");}}appendAttributes(attributes,?method,?prifix?+?"."?+?method.getName());checkAndConvertImplicitConfig(method,?map,?attributes);}}//attributes通過系統context進行存儲.StaticContext.getSystemContext().putAll(attributes);//在map裝載了application、module、consumer、reference的所有屬性信息后創建代理ref?=?createProxy(map);} private?T?createProxy(Map<String,?String>?map)?{URL?tmpUrl?=?new?URL("temp",?"localhost",?0,?map);final?boolean?isJvmRefer;if?(isInjvm()?==?null)?{if?(url?!=?null?&&?url.length()?>?0)?{?//指定URL的情況下,不做本地引用isJvmRefer?=?false;}?else?if?(InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl))?{//默認情況下如果本地有服務暴露,則引用本地服務.isJvmRefer?=?true;}?else?{isJvmRefer?=?false;}}?else?{isJvmRefer?=?isInjvm().booleanValue();}if?(isJvmRefer)?{URL?url?=?new?URL(Constants.LOCAL_PROTOCOL,?NetUtils.LOCALHOST,?0,?interfaceClass.getName()).addParameters(map);invoker?=?refprotocol.refer(interfaceClass,?url);if?(logger.isInfoEnabled())?{logger.info("Using?injvm?service?"?+?interfaceClass.getName());}}?else?{if?(url?!=?null?&&?url.length()?>?0)?{?//?用戶指定URL,指定的URL可能是對點對直連地址,也可能是注冊中心URLString[]?us?=?Constants.SEMICOLON_SPLIT_PATTERN.split(url);if?(us?!=?null?&&?us.length?>?0)?{for?(String?u?:?us)?{URL?url?=?URL.valueOf(u);if?(url.getPath()?==?null?||?url.getPath().length()?==?0)?{url?=?url.setPath(interfaceName);}if?(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol()))?{urls.add(url.addParameterAndEncoded(Constants.REFER_KEY,?StringUtils.toQueryString(map)));}?else?{urls.add(ClusterUtils.mergeUrl(url,?map));}}}}?else?{?//?通過注冊中心配置拼裝URLList<URL>?us?=?loadRegistries(false);if?(us?!=?null?&&?us.size()?>?0)?{for?(URL?u?:?us)?{URL?monitorUrl?=?loadMonitor(u);if?(monitorUrl?!=?null)?{map.put(Constants.MONITOR_KEY,?URL.encode(monitorUrl.toFullString()));}urls.add(u.addParameterAndEncoded(Constants.REFER_KEY,?StringUtils.toQueryString(map)));}}if?(urls?==?null?||?urls.size()?==?0)?{throw?new?IllegalStateException("No?such?any?registry?to?reference?"?+?interfaceName??+?"?on?the?consumer?"?+?NetUtils.getLocalHost()?+?"?use?dubbo?version?"?+?Version.getVersion()?+?",?please?config?<dubbo:registry?address=\"...\"?/>?to?your?spring?config.");}}if?(urls.size()?==?1)?{//此處舉例說明如果是Dubbo協議則調用DubboProtocol的refer方法生成invoker,當用戶調用service接口實際調用的是invoker的invoke方法invoker?=?refprotocol.refer(interfaceClass,?urls.get(0));}?else?{//多個service生成多個invokerList<Invoker<?>>?invokers?=?new?ArrayList<Invoker<?>>();URL?registryURL?=?null;for?(URL?url?:?urls)?{invokers.add(refprotocol.refer(interfaceClass,?url));if?(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol()))?{registryURL?=?url;?//?用了最后一個registry?url}}if?(registryURL?!=?null)?{?//?有?注冊中心協議的URL//?對有注冊中心的Cluster?只用?AvailableClusterURL?u?=?registryURL.addParameter(Constants.CLUSTER_KEY,?AvailableCluster.NAME);?invoker?=?cluster.join(new?StaticDirectory(u,?invokers));}??else?{?//?不是?注冊中心的URLinvoker?=?cluster.join(new?StaticDirectory(invokers));}}}Boolean?c?=?check;if?(c?==?null?&&?consumer?!=?null)?{c?=?consumer.isCheck();}if?(c?==?null)?{c?=?true;?//?default?true}if?(c?&&?!?invoker.isAvailable())?{throw?new?IllegalStateException("Failed?to?check?the?status?of?the?service?"?+?interfaceName?+?".?No?provider?available?for?the?service?"?+?(group?==?null???""?:?group?+?"/")?+?interfaceName?+?(version?==?null???""?:?":"?+?version)?+?"?from?the?url?"?+?invoker.getUrl()?+?"?to?the?consumer?"?+?NetUtils.getLocalHost()?+?"?use?dubbo?version?"?+?Version.getVersion());}if?(logger.isInfoEnabled())?{logger.info("Refer?dubbo?service?"?+?interfaceClass.getName()?+?"?from?url?"?+?invoker.getUrl());}//?創建服務代理return?(T)?proxyFactory.getProxy(invoker);}至此Reference在關聯了所有application、module、consumer、registry、monitor、service、protocol后調用對應Protocol類的refer方法生成InvokerProxy。當用戶調用service時dubbo會通過InvokerProxy調用Invoker的invoke的方法向服務端發起請求。客戶端就這樣完成了自己的初始化。
通觀全部dubbo代碼,有兩個很重要的對象就是Invoker和Exporter,Dubbo會根據用戶配置的協議調用不同協議的Invoker,再通過ReferenceFonfig將Invoker的引用關聯到Reference的ref屬性上提供給消費端調用。當用戶調用一個Service接口的一個方法后由于dubbo使用javassist動態代理,會調用Invoker的Invoke方法從而初始化一個RPC調用訪問請求訪問服務端的Service返回結果。
轉載于:https://my.oschina.net/u/1034176/blog/662350
總結
以上是生活随笔為你收集整理的dubbo源码分析(3)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mongodb查询语句与Sql语句对比
- 下一篇: CentOS6.x下GitLab安装