日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【dubbo源码解析】 --- dubbo集群容错(cluster)、负载均衡(loadbalance)底层原理探析 + 扩展自己的集群容错、负载均衡组件

發布時間:2023/12/8 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【dubbo源码解析】 --- dubbo集群容错(cluster)、负载均衡(loadbalance)底层原理探析 + 扩展自己的集群容错、负载均衡组件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文對應源碼地址:https://github.com/nieandsun/dubbo-study


文章目錄

  • 1 集群容錯和負載均衡的概念
  • 2 dubbo集群容錯 + 負載均衡底層原理
  • 3 簡單測試
  • 4 自己擴展一個dubbo集群容錯組件和負載均衡組件
    • 4.1 擴展一個集群容錯組件(Cluster)
    • 4.2 擴展一個loadbalance組件
    • 4.3 不要忘了在MATE-INF/dubbo文件夾下指定這些SPI擴展組件
    • 4.3 測試


1 集群容錯和負載均衡的概念

摘自dubbo官方博客:http://dubbo.apache.org/zh-cn/blog/dubbo-loadbalance.html

摘自dubbo官方博客:http://dubbo.apache.org/zh-cn/blog/dubbo-loadbalance.html


2 dubbo集群容錯 + 負載均衡底層原理

首先應該明確的是集群容錯 + 負載均衡其實都是由消費端的邏輯。

由《【dubbo源碼解析】— dubbo的服務暴露+服務消費(RPC調用)底層原理深入探析》文章的知識可知,在不引入注冊中心時,在消費端, protocol.refer 得到 invoker 對象, 并拿著該Invoker做一個【可以調用服務端的服務】的代理對象作為@Reference標注的對象。

那加入注冊中心邏輯后,dubbo是如何在RegistryProtocol對象中將集群容錯功能掛入到Dubbo的RPC鏈條中的呢?

Dubbo 在這里玩了個心眼。 真正的過程走得百繞千回, 看如下這段代碼:
RegistryProtocol. doRefer 方法:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);directory.setRegistry(registry);directory.setProtocol(protocol);//。。。。省略部分代碼Invoker invoker = cluster.join(directory);//。。。。省略部分代碼return invoker; }

原來的 protocol 被傳進RegistryDirectory 類中去了, doRefer 返回的 invoker對象, 是 cluster.join(directory)返回的 invoker。

而cluster 是一個擴展接口, 因此, 這個接口方法最終執行的對象, 是根據容錯策略自適配出來的對象, 如果 url 中不指定則默認是 failover 再看 failover 實現類的邏輯, 非常簡單, 只是返回一個FailoverClusterInvoker 對象:

public class FailoverCluster implements Cluster {public static final String NAME = "failover";public FailoverCluster() {}public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new FailoverClusterInvoker(directory);} }

再看下FailoverClusterInvoker中invoke方法調用后的的具體實現代碼doInvoke的邏輯:

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {//...... (省略部分代碼)//容錯次數int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;if (len <= 0) {len = 1;}//...... (省略部分代碼)for(int i = 0; i < len; ++i) {//容錯策略//Reselect before retry to avoid a change of candidate `invokers`.//NOTE: if `invokers` changed, then `invoked` also lose accuracy.if (i > 0) { checkWhetherDestroyed();copyinvokers = list(invocation);// check againcheckInvokers(copyinvokers, invocation);}Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);//記錄已經被選擇的Invoker --> 用作進行負載判斷invoked.add(invoker);//...... (省略部分代碼)try {Result result = invoker.invoke(invocation);//...... (省略部分代碼)Result var12 = result;return var12;}//...... (省略部分代碼)}}

此 invoker 的邏輯:
(1)按重試次數 for 循環, 只要不是正常返回, 則再試一次
(2)調用 select 方法,大致邏輯為:先通過SPI機制取得一個 loadbalance 對象—>然后根據獲取的loadbalance 對象select出來一個封裝了【可以調用服務端的服務】的Invoker —> 然后執行此 invoker 對象得到結果。

需要注意的是:
此 select 方法, 是從一組 invoker(即文章《【dubbo源碼解析】— dubbo的服務注冊與發現機制底層原理探析》中講到的RegistryDirectory對象的urlInvokerMap容器 中緩存的根據每一個服務端的URL生成的Invoker對象) 中選擇出來的一個 invoker。

畫個簡圖來描述上訴具體邏輯如下:


3 簡單測試


注意: 如果消費端未配置集群容錯 + 負載均衡策略的話,消費端會通過注冊中心獲取到服務端配置的參數,也就是說這些參數服務端都會以URL的形式給注冊中心,然后消費端根據獲取到的URL來根據SPI機制選擇到底使用哪種策略。

服務端:

ExtensionLoader<Protocol> protocolLoader = ExtensionLoader.getExtensionLoader(Protocol.class); ExtensionLoader<ProxyFactory> proxyLoader = ExtensionLoader.getExtensionLoader(ProxyFactory.class);//注冊中心服務--zk final URL registryUrl = URL.valueOf("registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?registry=zookeeper");//支持的協議:dubbo,http,hessian,rmi URL serviceUrl = URL.valueOf("dubbo://127.0.0.1:9001/com.nrsc.service.InvokerDemoService");@Test public void serverRpc() throws IOException {InvokerDemoService service = new InvokerDemoServiceImpl("yoyo");//生成代理工廠// --- 由URL確定到底是動態代理工廠(JdkProxyFactory)還是靜態代理工廠(JavassistProxyFactory)// --- 默認情況下為靜態代理工廠ProxyFactory proxy = proxyLoader.getAdaptiveExtension();//下面這兩句話完全可以放在外面 ---> 如果寫在外面,這里的代碼就和上文講到的RPC完整鏈條的代碼一致了//這里為了測試消費端可以動態監測到服務端的發布/下線,所以寫在了這里serviceUrl = serviceUrl.setPort(9001);//url中加入負載均衡和集群容錯參數//serviceUrl = serviceUrl.addParameter("loadbalance", "consistenthash");//serviceUrl = serviceUrl.addParameter("cluster", "failfast");//啟動自己擴展的loadbalance和cluster組件//serviceUrl = serviceUrl.addParameter("loadbalance", "first");//serviceUrl = serviceUrl.addParameter("cluster", "failsms");URL newRegistryUrl = registryUrl.addParameter(Constants.EXPORT_KEY, serviceUrl.toFullString());Invoker<InvokerDemoService> serviceInvoker = proxy.getInvoker(service, InvokerDemoService.class, newRegistryUrl);//獲取具體的協議Protocol protocol = protocolLoader.getAdaptiveExtension();Exporter<InvokerDemoService> exporter = protocol.export(serviceInvoker);System.out.println("server 啟動協議:" + serviceUrl.getProtocol());// 保證服務一直開著System.in.read();exporter.unexport(); }/***** 除了接口外,其他和serverRpc1()一樣,主要用來測試消費端可以動態監測到服務端的發布/下線* @throws IOException*/ @Test public void serverRpc2() throws IOException {InvokerDemoService service = new InvokerDemoServiceImpl("nrsc");//生成代理工廠// --- 由URL確定到底是動態代理工廠(JdkProxyFactory)還是靜態代理工廠(JavassistProxyFactory)// --- 默認情況下為靜態代理工廠ProxyFactory proxy = proxyLoader.getAdaptiveExtension();//下面這兩句話完全可以放在外面 ---> 如果寫在外面,這里的代碼就和上文講到的RPC完整鏈條的代碼一致了//這里為了測試消費端可以動態監測到服務端的發布/下線,所以寫在了這里serviceUrl = serviceUrl.setPort(9002);//url中加入負載均衡和集群容錯參數//serviceUrl = serviceUrl.addParameter("loadbalance", "consistenthash");//serviceUrl = serviceUrl.addParameter("cluster", "failfast");//啟動自己擴展的loadbalance和cluster組件//serviceUrl = serviceUrl.addParameter("loadbalance", "first");//serviceUrl = serviceUrl.addParameter("cluster", "failsms");URL newRegistryUrl = registryUrl.addParameter(Constants.EXPORT_KEY, serviceUrl.toFullString());Invoker<InvokerDemoService> serviceInvoker = proxy.getInvoker(service, InvokerDemoService.class, newRegistryUrl);//獲取具體的協議Protocol protocol = protocolLoader.getAdaptiveExtension();Exporter<InvokerDemoService> exporter = protocol.export(serviceInvoker);System.out.println("server 啟動協議:" + serviceUrl.getProtocol());// 保證服務一直開著System.in.read();exporter.unexport(); }

消費端:

@Test public void clientRpc() throws IOException {Protocol protocol = protocolLoader.getAdaptiveExtension();//生成代理工廠ProxyFactory proxy = proxyLoader.getAdaptiveExtension();//由代理工廠生成Invoker對象Invoker<InvokerDemoService> referInvoker = protocol.refer(InvokerDemoService.class, registryUrl);//生成DemoService的代理類InvokerDemoService service = proxy.getProxy(referInvoker);for (int i = 0; i < 6; i++) {String result = service.sayHello(registryUrl.getProtocol() + "調用");System.out.println(result);}// 保證服務一直開著 ,測試消費端可以動態監測到服務端的發布/下線//System.in.read(); }

測試結果1:
分別啟用兩個服務端,然后啟用消費端,在未配置集群容錯+ 負載均衡的情況下調用結果和結論如下:

測試結果2:
放開服務端代碼中關于集群容錯+ 負載均衡的注釋代碼,重新啟用兩個服務端,然后啟用消費端,調用結果和結論如下:


4 自己擴展一個dubbo集群容錯組件和負載均衡組件


4.1 擴展一個集群容錯組件(Cluster)

由dubbo源碼可知要想擴展一個集群容錯組件,需要做兩件事

  • (1)實現Cluster接口,并重寫其Join方法
  • (2)在Join方法里返回一個XXXClusterInvoker 用于包裝含有【可以調用服務端的服務】的Invoker ,且集群容錯的邏輯,就寫在XXXClusterInvoker 的Invoker方法里

這里為了簡便期間,我以 FailfastClusterInvoker作為XXXClusterInvoker ,則自定的Cluster代碼如下:

/**** 當然如果想新擴展一個Cluster組件,肯定還要配套弄一個XXXClusterInvoker,這里就直接使用FailfastClusterInvoker代替了*/ public class FailSmsCluster implements Cluster {@Overridepublic <T> Invoker<T> join(Directory<T> directory) throws RpcException {sendSms();return new FailfastClusterInvoker<>(directory);}private void sendSms() {System.out.println("send sms notify!");} }

4.2 擴展一個loadbalance組件

擴展loadbalance組件,要比擴展Cluster組件簡單的多,只需要實現LoadBalance接口,并重寫其select方法就可以了,這里提供一個比較簡單的loadbalance組件 — 只選擇第一個注冊的Invoker。

public class FirstLoadBalance implements LoadBalance {@Overridepublic <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {System.out.println("FirstLoadBalance : Select the first invoker...");return invokers.get(0);} }

4.3 不要忘了在MATE-INF/dubbo文件夾下指定這些SPI擴展組件

***

4.3 測試

打開服務端的如下代碼,并重啟兩個服務端 —>然后再重啟消費端

//啟動自己擴展的loadbalance和cluster組件serviceUrl = serviceUrl.addParameter("loadbalance", "first");serviceUrl = serviceUrl.addParameter("cluster", "failsms");

調用結果和結論如下:


end!

總結

以上是生活随笔為你收集整理的【dubbo源码解析】 --- dubbo集群容错(cluster)、负载均衡(loadbalance)底层原理探析 + 扩展自己的集群容错、负载均衡组件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。