日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java调用dubbo服务器_dubbo源码分析-服务端注册流程-笔记

發布時間:2023/12/10 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java调用dubbo服务器_dubbo源码分析-服务端注册流程-笔记 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前面,我們已經知道,基于spring這個解析入口,到發布服務的過程,接著基于DubboProtocol去發布,最終調用Netty的api創建了一個NettyServer。

那么繼續沿著RegistryProtocol.export這個方法,來看看注冊服務的代碼:

RegistryProtocol.export

public Exporter export(final Invoker originInvoker) throws RpcException {

//export invoker

final ExporterChangeableWrapper exporter = doLocalExport(originInvoker); //發布本地服務

//registry provider

final Registry registry = getRegistry(originInvoker);

final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

registry.register(registedProviderUrl);

// 訂閱override數據

// FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導致訂閱信息覆蓋。

final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);

final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);

overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

//保證每次export都返回一個新的exporter實例

return new Exporter() {

public Invoker getInvoker() {

return exporter.getInvoker();

}

public void unexport() {

try {

exporter.unexport();

} catch (Throwable t) {

logger.warn(t.getMessage(), t);

}

try {

registry.unregister(registedProviderUrl);

} catch (Throwable t) {

logger.warn(t.getMessage(), t);

}

try {

overrideListeners.remove(overrideSubscribeUrl);

registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);

} catch (Throwable t) {

logger.warn(t.getMessage(), t);

}

}

};

}

getRegistry

這個方法是invoker的地址獲取registry實例

/**

* 根據invoker的地址獲取registry實例

* @param originInvoker

* @return

*/

private Registry getRegistry(final Invoker> originInvoker){

URL registryUrl = originInvoker.getUrl(); //獲得registry://192.168.11.156:2181的協議地址

if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {

//得到zookeeper的協議地址

String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);

//registryUrl就會變成了zookeeper://192.168.11.156

registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);

}

//registryFactory是什么?

return registryFactory.getRegistry(registryUrl);

}

registryFactory.getRegistry

這段代碼很明顯了,通過前面這段代碼的分析,其實就是把registry的協議頭改成服務提供者配置的協議地址,也就是我們配置的

然后registryFactory.getRegistry的目的,就是通過協議地址匹配到對應的注冊中心。

那registryFactory是一個什么樣的對象呢?我們找一下這個代碼的定義

private RegistryFactory registryFactory;

public void setRegistryFactory(RegistryFactory registryFactory) {

this.registryFactory = registryFactory;

}

這個代碼有點眼熟,再來看看RegistryFactory這個類的定義,我猜想一定是一個擴展點,不信,咱們看

并且,大家還要注意這里面的一個方法上,有一個@Adaptive的注解,說明什么? 這個是一個自適應擴展點。

按照我們之前看過代碼,自適應擴展點加在方法層面上,表示會動態生成一個自適應的適配器。

所以這個自適應適配器應該是RegistryFactory$Adaptive

@SPI("dubbo")

public interface RegistryFactory {

/**

* 連接注冊中心.

*

* 連接注冊中心需處理契約:

* 1. 當設置check=false時表示不檢查連接,否則在連接不上時拋出異常。

* 2. 支持URL上的username:password權限認證。

* 3. 支持backup=10.20.153.10備選注冊中心集群地址。

* 4. 支持file=registry.cache本地磁盤文件緩存。

* 5. 支持timeout=1000請求超時設置。

* 6. 支持session=60000會話超時或過期設置。

*

* @param url 注冊中心地址,不允許為空

* @return 注冊中心引用,總不返回空

*/

@Adaptive({"protocol"})

Registry getRegistry(URL url);

}

RegistryFactory$Adaptive

我們拿到這個動態生成的自適應擴展點,看看這段代碼里面的實現

從url中拿到協議頭信息,這個時候的協議頭是zookeeper://

通過ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(“zookeeper”)去獲得一個指定的擴展點,而這個擴展點的配置在

dubbo-registry-zookeeper/resources/META-INF/dubbo/internal/com.alibaba.dubbo.registry.RegistryFactory。

得到一個ZookeeperRegistryFactory

public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {

public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {

if (arg0 == null) throw new IllegalArgumentException("url == null");

com.alibaba.dubbo.common.URL url = arg0;

String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());

if (extName == null)

throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) " +

"name from url(" + url.toString() + ") use keys([protocol])");

com.alibaba.dubbo.registry.RegistryFactory extension =

(com.alibaba.dubbo.registry.RegistryFactory)

ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).

getExtension(extName);

return extension.getRegistry(arg0);

}

}

ZookeeperRegistryFactory

這個方法中并沒有getRegistry方法,而是在父類AbstractRegistryFactory

從緩存REGISTRIES中,根據key獲得對應的Registry

如果不存在,則創建Registry

public Registry getRegistry(URL url) {

url = url.setPath(RegistryService.class.getName())

.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())

.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);

String key = url.toServiceString();

// 鎖定注冊中心獲取過程,保證注冊中心單一實例

LOCK.lock();

try {

Registry registry = REGISTRIES.get(key);

if (registry != null) {

return registry;

}

registry = createRegistry(url);

if (registry == null) {

throw new IllegalStateException("Can not create registry " + url);

}

REGISTRIES.put(key, registry);

return registry;

} finally {

// 釋放鎖

LOCK.unlock();

}

}

createRegistry

創建一個注冊中心,這個是一個抽象方法,具體的實現在對應的子類實例中實現的,在ZookeeperRegistryFactory中

public Registry createRegistry(URL url) {

return new ZookeeperRegistry(url, zookeeperTransporter);

}

通過zkClient,獲得一個zookeeper的連接實例

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {

super(url);

if (url.isAnyHost()) {

throw new IllegalStateException("registry address == null");

}

String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);

if (! group.startsWith(Constants.PATH_SEPARATOR)) {

group = Constants.PATH_SEPARATOR + group;

}

this.root = group; //設置根節點

zkClient = zookeeperTransporter.connect(url);//建立連接

zkClient.addStateListener(new StateListener() {

public void stateChanged(int state) {

if (state == RECONNECTED) {

try {

recover();

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

}

});

}

代碼分析到這里,我們對于getRegistry得出了一個結論,根據當前注冊中心的配置信息,獲得一個匹配的注冊中心,也就是ZookeeperRegistry

registry.register(registedProviderUrl);

繼續往下分析,會調用registry.register去將dubbo://的協議地址注冊到zookeeper上

這個方法會調用FailbackRegistry類中的register. 為什么呢?

因為ZookeeperRegistry這個類中并沒有register這個方法,但是他的父類FailbackRegistry中存在register方法,而這個類又重寫了AbstractRegistry類中的register方法。

所以我們可以直接定位大FailbackRegistry這個類中的register方法中

FailbackRegistry.register

FailbackRegistry,從名字上來看,是一個失敗重試機制

調用父類的register方法,講當前url添加到緩存集合中

調用doRegister方法,這個方法很明顯,是一個抽象方法,會由ZookeeperRegistry子類實現。

@Override

public void register(URL url) {

super.register(url);

failedRegistered.remove(url);

failedUnregistered.remove(url);

try {

// 向服務器端發送注冊請求

doRegister(url);

} catch (Exception e) {

Throwable t = e;

// 如果開啟了啟動時檢測,則直接拋出異常

boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)

&& url.getParameter(Constants.CHECK_KEY, true)

&& ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());

boolean skipFailback = t instanceof SkipFailbackWrapperException;

if (check || skipFailback) {

if(skipFailback) {

t = t.getCause();

}

throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);

} else {

logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);

}

// 將失敗的注冊請求記錄到失敗列表,定時重試

failedRegistered.add(url);

}

}

ZookeeperRegistry.doRegister

終于找到你了,調用zkclient.create在zookeeper中創建一個節點。

protected void doRegister(URL url) {

try {

zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));

} catch (Throwable e) {

throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

}

}

RegistryProtocol.export 這個方法中后續的代碼就不用再分析了。

就是去對服務提供端去注冊一個zookeeper監聽,當監聽發生變化的時候,服務端做相應的處理。

在register 方法里面,調用subscribe 方法,訂閱注冊中心變化

/**

* 訂閱符合條件的已注冊數據,當有注冊數據變更時自動推送.

*

* 訂閱需處理契約:

* 1. 當URL設置了check=false時,訂閱失敗后不報錯,在后臺定時重試。

* 2. 當URL設置了category=routers,只通知指定分類的數據,多個分類用逗號分隔,并允許星號通配,表示訂閱所有分類數據。

* 3. 允許以interface,group,version,classifier作為條件查詢,如:interface=com.alibaba.foo.BarService&version=1.0.0

* 4. 并且查詢條件允許星號通配,訂閱所有接口的所有分組的所有版本,或:interface=*&group=*&version=*&classifier=*

* 5. 當注冊中心重啟,網絡抖動,需自動恢復訂閱請求。

* 6. 允許URI相同但參數不同的URL并存,不能覆蓋。

* 7. 必須阻塞訂閱過程,等第一次通知完后再返回。

*

* @param url 訂閱條件,不允許為空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin

* @param listener 變更事件監聽器,不允許為空

*/

void subscribe(URL url, NotifyListener listener);

subscribe ->doSubscribe ->notify ->

protected void notify(URL url, NotifyListener listener, List urls) {

if (url == null) {

throw new IllegalArgumentException("notify url == null");

}

if (listener == null) {

throw new IllegalArgumentException("notify listener == null");

}

if ((urls == null || urls.size() == 0)

&& ! Constants.ANY_VALUE.equals(url.getServiceInterface())) {

logger.warn("Ignore empty notify urls for subscribe url " + url);

return;

}

if (logger.isInfoEnabled()) {

logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);

}

Map> result = new HashMap>();

for (URL u : urls) {

if (UrlUtils.isMatch(url, u)) {

String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);

List categoryList = result.get(category);

if (categoryList == null) {

categoryList = new ArrayList();

result.put(category, categoryList);

}

categoryList.add(u);

}

}

if (result.size() == 0) {

return;

}

Map> categoryNotified = notified.get(url);

if (categoryNotified == null) {

notified.putIfAbsent(url, new ConcurrentHashMap>());

categoryNotified = notified.get(url);

}

// 第一次主動調用 notify

// 對 /router /providers /configerations 路徑下的變更 進行notify

//后續(zookeeper watcher 機制)

for (Map.Entry> entry : result.entrySet()) {

String category = entry.getKey();

List categoryList = entry.getValue();

categoryNotified.put(category, categoryList);

saveProperties(url);

listener.notify(categoryList);

}

}

總結

以上是生活随笔為你收集整理的java调用dubbo服务器_dubbo源码分析-服务端注册流程-笔记的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。