生活随笔
收集整理的這篇文章主要介紹了
dubbo源码分析第七篇一服务暴露第三小节一远程暴露内核剖析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 服務暴露原理圖
- 多注冊中心遍歷暴露
- JavassistProxyFactory.getInvoker獲取可執行體
- Export
- RegistryProtocol.export
- RegistryProtocol.export一核心實現
- DubboProtocol.export
- 總結
- 擴展點一Wrapper實現
- 擴展點一 zk上的數據結構
- 擴展點一zk Listener 監聽實現機制
- 擴展點一獲取注冊中心客戶端對象
服務暴露原理圖
多注冊中心遍歷暴露
- 遍歷注冊中心進行服務暴露
- 構建Invoker,Invoker含(ref應用程序)目標對象和(避免反射從而提升效率的)Wrapper對象
- 通過RegisterURL(register://),進行spi發現,獲取協議實現類進行指定協議的服務暴露
- RegisterURL持有url.toFullString,也就是dubbo:// 協議url,在Spi選擇RegisterProtocol執行后再次調用protocol.export,此時傳入url為dubbo://協議再調用DubboProtocol進行服務暴露
- export執行鏈路基于SPI確定,具體鏈路如下
|
| 注冊協議 | Protocol$Adaptive>ProtocolFilterWrapper>ProtocolListenerWrapper>RegistryProtocol |
| dubbo協議 | Protocol$Adaptive>ProtocolFilterWrapper>ProtocolListenerWrapper>DubboProtocol |
if (CollectionUtils.isNotEmpty(registryURLs
)) {step
-1: 遍歷所有的注冊中心進行注冊 一個registryURLs大小為一
for (URL registryURL
: registryURLs
) {step
-1: 如果是本地協議的注冊中心 直接跳過
if (LOCAL_PROTOCOL
.equalsIgnoreCase(url
.getProtocol())) {continue;}...... 刪除url處理代碼、、、、、、 、、、、、、desc1
: 構建
Invoker對象
:Invoker持有ref對象和
Wrapper對象
;1 ref為我們定義的service對象
;2 Wrapper對象為動態代理生成對象desc2
: registryURL為注冊中心協議URL
;url
.toFullString()為服務提供者url desc3
: Invoker持有URL對象 于protocol
.export(wrapperInvoker
)時根據URL尋找對象的
Protocol實現類
[spi
]先通過
Spi根據registryURL調用
RegisterProtocol 再于
RegisterProtocol獲取url
.toFullString()Spi基于url
.toFullString()獲取
DubboProtocol協議進行暴露、、、、、、 、、、、、、
Invoker<?> invoker
= PROXY_FACTORY
.getInvoker(ref
, (Class) interfaceClass
, registryURL
.addParameterAndEncoded(EXPORT_KEY
, url
.toFullString()));DelegateProviderMetaDataInvoker wrapperInvoker
= new DelegateProviderMetaDataInvoker(invoker
, this);、、、、、、 、、、、、、通過wrapperInvoker的URL屬性查找protocol實現類調用鏈路為
Protocol$
Adaptive > ProtocolFilterWrapper > ProtocolListenerWrapper > RegistryProtocolProtocol$
Adaptive > ProtocolFilterWrapper > ProtocolListenerWrapper > DubboProtocol、、、、、、 、、、、、、
Exporter<?> exporter
= protocol
.export(wrapperInvoker
);exporters
.add(exporter
);}
}
JavassistProxyFactory.getInvoker獲取可執行體
public <T> Invoker<T> getInvoker(T proxy
, Class<T> type
, URL url
) {構建一個wrapper實例
[根據proxy構建
Wrapper源碼
,動態生成并編譯wrapper源碼
,生成
Class,利用clazz對象構建wrapper實例
]final Wrapper wrapper
= Wrapper.getWrapper(proxy
.getClass().getName().indexOf('$') < 0 ? proxy
.getClass() : type
);構建一個內部類 持有proxy以及wrapper對象wrapper對象invokeMethod內部編排了目標對象方法名與目標對象的方法調用邏輯
,從而避免反射
,提升效率
return new AbstractProxyInvoker<T>(proxy
, type
, url
) {@Overrideprotected Object doInvoke(T proxy
, String methodName
,Class<?>[] parameterTypes
,Object[] arguments
) throws Throwable {return wrapper
.invokeMethod(proxy
, methodName
, parameterTypes
, arguments
);}};
}
Export
- export第一階段:RegistryProtocol.export
- export第二階段:DubboProtocol.export
RegistryProtocol.export
- 注冊協議不調用構建filter鏈[dubbo過濾機制]
- 直接調用ProtocolListenerWrapper
public class ProtocolFilterWrapper implements Protocol {public <T> Exporter<T> export(Invoker<T> invoker
) throws RpcException {注冊協議直接調用
ProtocolListenerWrapperif (REGISTRY_PROTOCOL
.equals(invoker
.getUrl().getProtocol())) {return protocol
.export(invoker
);}return protocol
.export(buildInvokerChain(invoker
, SERVICE_FILTER_KEY
, CommonConstants.PROVIDER
));}
}
- 注冊協議不調用監聽機制
- 直接調用RegisterProtocol
public class ProtocolFilterWrapper implements Protocol {public <T> Exporter<T> export(Invoker<T> invoker
) throws RpcException {if (REGISTRY_PROTOCOL
.equals(invoker
.getUrl().getProtocol())) {return protocol
.export(invoker
);}非注冊協議構建監聽機制 監聽服務暴露和銷毀事件
return new ListenerExporterWrapper<T>(protocol
.export(invoker
),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker
.getUrl(), EXPORTER_LISTENER_KEY
)));}
}
RegistryProtocol.export一核心實現
- 調用dubboProtocol進行服務暴露
- 創建overrideSubscribeListener用于監聽zk服務端目錄變更
- 通過注冊中心對象向zk服務端注冊服務提供者URL
- 通過注冊中心配置訂閱URL和訂閱Listener用以訂閱Configurators下的override協議URL
public class RegistryProtocol implements Protocol {public <T> Exporter<T> export(final Invoker<T> originInvoker
) throws RpcException {URL registryUrl
= getRegistryUrl(originInvoker
);獲得服務提供者 URL
URL providerUrl
= getProviderUrl(originInvoker
);設置協議為provider category為configurators
final URL overrideSubscribeUrl
= getSubscribedOverrideUrl(providerUrl
);final OverrideListener overrideSubscribeListener
= new OverrideListener(overrideSubscribeUrl
, originInvoker
);overrideListeners
.put(overrideSubscribeUrl
, overrideSubscribeListener
);providerUrl
= overrideUrlWithConfig(providerUrl
, overrideSubscribeListener
);step
-1: 交由dubboProtocol 暴露服務 構建exporter 創建netty
final ExporterChangeableWrapper<T> exporter
= doLocalExport(originInvoker
, providerUrl
);final Registry registry
= getRegistry(originInvoker
);final URL registeredProviderUrl
= getUrlToRegistry(providerUrl
, registryUrl
);boolean register
= providerUrl
.getParameter(REGISTER_KEY
, true);if (register
) {step
-3: 向注冊中心注冊服務提供者(自己)第一步獲取注冊中心實例,第二步是向注冊中心注冊服務 注冊
Provider信息
register(registryUrl
, registeredProviderUrl
);}registerStatedUrl(registryUrl
, registeredProviderUrl
, register
);exporter
.setRegisterUrl(registeredProviderUrl
);exporter
.setSubscribeUrl(overrideSubscribeUrl
);step
-4: 訂閱 override協議URLregistry
.subscribe(overrideSubscribeUrl
, overrideSubscribeListener
);監聽器 這不是zk那個監聽變更 這是一個觀察監聽設計模式 一般暫無實現
notifyExport(exporter
);return new DestroyableExporter<>(exporter
);}
}
DubboProtocol.export
- 調用入口在RegistryProtocol.doLocalExport
- 通過ProtocolFilterWrapper構建filter鏈
- 通過ProtocolListenerWrapper構建監聽器,監聽服務暴露和取消暴露事件
- dubboProtocol完成exporter暴露
- 構建通信層[exchange transportor codec]借助netty4完成通信搭建
public <T> Exporter<T> export(Invoker<T> invoker
) throws RpcException {URL url
= invoker
.getUrl();key
= org.apache.dubbo.demo.DemoService:20880String key
= serviceKey(url
);服務暴露 通過
Invoker構建將暴露對象exporter并加入exporterMap將來通過netty找到exporterMap的相關
Invoker,在調用相關方法完成rpc接口調用
DubboExporter<T> exporter
= new DubboExporter<T>(invoker
, key
, exporterMap
);exporterMap
.put(key
, exporter
);Boolean isStubSupportEvent
= url
.getParameter(STUB_EVENT_KEY
, DEFAULT_STUB_EVENT
);Boolean isCallbackservice
= url
.getParameter(IS_CALLBACK_SERVICE
, false);if (isStubSupportEvent
&& !isCallbackservice
) {String stubServiceMethods
= url
.getParameter(STUB_EVENT_METHODS_KEY
);if (stubServiceMethods
== null || stubServiceMethods
.length() == 0) {if (logger
.isWarnEnabled()) {logger
.warn(new IllegalStateException("consumer [" + url
.getParameter(INTERFACE_KEY
) +"], has set stubproxy support event ,but no stub methods founded."));}}}這里的三層exchange transportor codec都是圍繞netty展開
,參見后文啟動服務器
openServer(url
);初始化序列化優化器 序列號
optimizeSerialization(url
);return exporter
;}
總結
- 講解了dubbo圍繞url展開的服務暴露
- 通過spi串聯了整個流程以及如何通過javassist避免了反射調用
- 其中與注冊中心的交互主要完成URL的注冊,配置URL的訂閱監聽
- 與dubbo協議的交互主要完成exporter暴露,以及通信層的構建
擴展點一Wrapper實現
所有的Wrapper動態類名為Wrapper+自增id$1,$2是javassist語法表示第幾個參數
| $w如果返回基本類型則改成包裝類型 否則不做處理 | wrapper通過javassist動態生成;最重要的方法是invokeMethod 將來在rpc調用時不在通過反射調用提升性能 |
public class Wrapper0 extends Wrapper {public static String[] pns
; public static Map pts
; public static String[] mns
; public static String[] dmns
;public static Class[] mts0
;public static Class[] mts1
;public static Class[] mtsn
;public String[] getPropertyNames(){ return pns
; }public boolean hasProperty(String n
){ return pts
.containsKey($
1); }public Class getPropertyType(String n
){ return (Class)pts
.get($
1); }public String[] getMethodNames(){ return mns
; }public String[] getDeclaredMethodNames(){ return dmns
; }public void setPropertyValue(Object o
, String n
, Object v
) {com.mockuai.ec.deliverycenter.common.api.DeliveryService w
;try {w
= ((com.mockuai.ec.deliverycenter.common.api.DeliveryService) $
1);} catch (Throwable e
) {throw new IllegalArgumentException(e
);}throw new org.apache.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $
2 + "\" field or setter method in class com.mockuai.ec.deliverycenter.common.api.DeliveryService.");}public Object getPropertyValue(Object o
, String n
) {com.mockuai.ec.deliverycenter.common.api.DeliveryService w
;try {w
= ((com.mockuai.ec.deliverycenter.common.api.DeliveryService) $
1);} catch (Throwable e
) {throw new IllegalArgumentException(e
);}throw new org.apache.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $
2 + "\" field or setter method in class com.mockuai.ec.deliverycenter.common.api.DeliveryService.");}public Object invokeMethod(Object o
, String n
, Class[] p
, Object[] v
) throws java.lang.reflect.InvocaionTargetException {com.mockuai.ec.deliverycenter.common.api.DeliveryService w
;try {w
= ((com.mockuai.ec.deliverycenter.common.api.DeliveryService) $
1);} catch (Throwable e
) {throw new IllegalArgumentException(e
);}try {if ("exportDeliveryInfoStatusDTOList".equals($
2) && $
3.length
== 2) {return ($w
) w
.exportDeliveryInfoStatusDTOList((com.mockuai.ec.deliverycenter.common.domain.qto.DeliveryAbnormalStatusInfoQTO) $
4[0], (com.mockuai.ec.deliverycenter.common.api.helper.CallerInfo) $
4[1]);}} catch (Throwable e
) {throw new java.lang.reflect.InvocationTargetException(e
);}throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $
2 + "\" in class com.mockuai.ec.deliverycenter.common.api.DeliveryService.");}
}
擴展點一 zk上的數據結構
根據url的category分成四類
- consumers存儲消費者url合集,協議為consumer://
- providers存儲提供者url合集,協議為dubbo://
- configurators存儲配置url合集,協議為override://
- routers存儲路由url合集,協議為route://
擴展點一zk Listener 監聽實現機制
- dubbo對zk的監聽有兩種實現OverrideListener和RegistryDirectory
- OverrideListener是服務提供者的監聽器 僅監聽配置目錄
- RegistryDirectory是服務消費者的監聽器 監聽配置,路由,服務提供者目錄
OverrideListener實現原理 ,先通過ZookeeperRegistry完成訂閱
ZookeeperRegistry 持有該overrideSubscribeListener
ZookeeperRegistry通過內部zkclient訂閱相關configurators
當zk服務器變更則通知ZookeeperRegistry,其根據內部持有的overrideSubscribeListener集合找到相應listener調用其notify
OverrideListener內持有originInvoker,會對originInvoker進行RegistryProtocol.reExport
- overrideSubscribeListener注冊監聽
registry
.subscribe(overrideSubscribeUrl
, overrideSubscribeListener
);
public class AbstractRegistry {public void subscribe(URL url
, NotifyListener listener
) {添加到監聽器合集
Set<NotifyListener> listeners
= subscribed
.computeIfAbsent(url
, n
-> new ConcurrentHashSet<>());listeners
.add(listener
);}
}public class ZookeeperRegistry {zk監聽器和
NotifyListener[overridelistener和消費者監聽器
]監聽器的映射
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners
= new ConcurrentHashMap<>();public void doSubscribe(final URL url
, final NotifyListener listener
) {zkClient
.create(path
, false);List<String> children
= zkClient
.addChildListener(path
, zkListener
);創建后調用監聽器合集一次
notify(url
, listener
, urls
);}
}
擴展點一獲取注冊中心客戶端對象
ZookeeperRegistryFailbackRegistryAbstractRegistryRegistry
| 負責zk通信 | 負責失敗重試 | 通用功能 | 接口 |
- registryFactory獲取注冊中心
- registryUrl為zookeeper協議,獲取ZookeeperRegistryFactory
final Registry registry
= getRegistry(originInvoker
);
protected Registry getRegistry(final Invoker<?> originInvoker
) {URL registryUrl
= getRegistryUrl(originInvoker
);return registryFactory
.getRegistry(registryUrl
);
}
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {@Overridepublic Registry createRegistry(URL url
) {return new ZookeeperRegistry(url
, zookeeperTransporter
);}}
總結
以上是生活随笔為你收集整理的dubbo源码分析第七篇一服务暴露第三小节一远程暴露内核剖析的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。