日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

万字长文浅析SpringCould微服务负载均衡框架Ribbon源码(字多慎入)

發(fā)布時間:2025/3/18 javascript 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 万字长文浅析SpringCould微服务负载均衡框架Ribbon源码(字多慎入) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

版本

作者:韓數(shù)

Github:github.com/hanshuaikan…

完成日期:2019-06-16日

jdk:1.8

springboot版本:2.1.3.RELEASE

SpringCould版本:Greenwich.SR1

聲明:

身為一個剛?cè)腴T的計算機(jī)菜佬,閱讀源碼自然離不開優(yōu)秀參考書籍和視頻的引導(dǎo),本篇文章的分析過程中"嚴(yán)重"借鑒了 翟永超 前輩的《SpringCloud微服務(wù)實戰(zhàn)》這本書籍,在這里也向準(zhǔn)備學(xué)習(xí)微服務(wù)的小伙伴們強(qiáng)烈推薦這本書,大家可以把這篇文章理解為《SpringCloud微服務(wù)實戰(zhàn)》Ribbon部分的精簡版和電子版,因為個人水平的原因,很多問題不敢妄下定論,以免誤人子弟,所有書上很多內(nèi)容都是精簡過后直接放上去的,由于SpringCloud已經(jīng)迭代到了Greenwich.SR1版本,Ribbon也和書上有了略微的差別,本篇文章的源碼采用的是Ribbon最新版本,同時,因為時間原因,有很多額外的子類實現(xiàn)并沒有完全顧上,例如PredicateBasedRule類的ZoneAvoidanceRule和AvailabilityFilteringRule 感興趣的讀者可以買《SpringCloud微服務(wù)實戰(zhàn)》這本書細(xì)看,同時強(qiáng)烈推薦小馬哥的微服務(wù)直播課系列《小馬哥微服務(wù)實戰(zhàn)》。

致謝

翟永超:博客地址:

blog.didispace.com/aboutme/

小馬哥: Java 微服務(wù)實踐 - Spring Boot / Spring Cloud購買鏈接:

segmentfault.com/ls/16500000…

電子版及相關(guān)代碼下載(歡迎Star)

Github:github.com/hanshuaikan…

微信公眾號:碼上marson

快速上手:

配置負(fù)載均衡

當(dāng)使用Eureka時,須做如下配置

## 服務(wù)提供方 spring.application.name = spring-cloud-ribbon-client### 服務(wù)端口 server.port = 8080### 管理安全失效 management.endpoints.web.exposure.include=*### 暫時性關(guān)閉 Eureka 注冊 ## 當(dāng)使用 Eureka 服務(wù)發(fā)現(xiàn)時,請注釋掉一下配置 # eureka.client.enabled = false## 連接 Eureka Sever eureka.client.serviceUrl.defaultZone = http://localhost:10000/eureka/ eureka.client.registryFetchIntervalSeconds = 5### 服務(wù)提供方主機(jī) serivce-provider.host = localhost ### 服務(wù)提供方端口 serivce-provider.port = 9090serivce-provider.name = spring-cloud-service-provider復(fù)制代碼

當(dāng)不適用Eureka的時候,需要配置如下

### 配置ribbon 服務(wù)地提供方 ## 當(dāng)使用 Eureka 服務(wù)發(fā)現(xiàn)時,請注釋掉一下配置 # spring-cloud-service-provider.ribbon.listOfServers = \ #http://${serivce-provider.host}:${serivce-provider.port} 復(fù)制代碼

激活負(fù)載均衡

@SpringBootApplication @RibbonClients({@RibbonClient(name = "spring-cloud-service-provider") }) @EnableDiscoveryClient public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}//聲明 RestTemplate@LoadBalanced // RestTemplate 的行為變化@Beanpublic RestTemplate restTemplate(){return new RestTemplate();}} 復(fù)制代碼

測試發(fā)送請求

return restTemplate.postForObject("http://" +serviceProviderName +"/greeting",user, String.class); 復(fù)制代碼

初探Ribbon源碼

LoadBalancerClient 類

在Spring 中 ,當(dāng)服務(wù)消費端去調(diào)用服務(wù)提供者的服務(wù)的時候,已經(jīng)封裝了一個模板類,叫做RestTemplate.那么Ribbon 又是如何通過RestTemplate來實現(xiàn)負(fù)載均衡的呢?

線索@LoadBalanced 注解:

# Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.注釋,用于標(biāo)記要配置為使用LoadBalancerClient的RestTemplate bean。 復(fù)制代碼

ServiceInstanceChooser接口

public interface ServiceInstanceChooser {/*** 從LoadBalancer中為指定的服務(wù)選擇一個ServiceInstance。* @param serviceId是查找LoadBalancer的服務(wù)ID。* @return 一個與serviceId匹配的ServiceInstance。*/ServiceInstance choose(String serviceId);} 復(fù)制代碼

ServiceInstance choose(String serviceId) :根據(jù)serviceId 去選擇一個對應(yīng)服務(wù)的實例

LoadBalancerClient

LoadBalancerClient 代碼:

package org.springframework.cloud.client.loadbalancer;import java.io.IOException; import java.net.URI; import org.springframework.cloud.client.ServiceInstance;public interface LoadBalancerClient extends ServiceInstanceChooser {<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;URI reconstructURI(ServiceInstance instance, URI original); } 復(fù)制代碼

T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request)

使用指定的LoadBalancer中的ServiceInstance執(zhí)行請求

**serviceInstance ** : 要執(zhí)行請求的服務(wù)

T execute(String serviceId, LoadBalancerRequest request) :

使用從負(fù)載均衡器中挑選出來的服務(wù)實例來執(zhí)行請求內(nèi)容。

URI reconstructURI(ServiceInstance instance, URI original);

返回一個 一 個 host:port 形式的URL對象用于我們最后像服務(wù)端發(fā)送請求的地址。而具體的host,port等信息

則從 instance參數(shù)中獲取。

ServiceInstance 類

public interface ServiceInstance {default String getInstanceId() {return null;}String getServiceId();String getHost();int getPort();boolean isSecure();URI getUri();Map<String, String> getMetadata();default String getScheme() {return null;} } 復(fù)制代碼

LoadBalancerAutoConfiguration類

作用:Ribbon 的自動化配置類代碼(部分):

@Configuration @ConditionalOnClass({RestTemplate.class}) @ConditionalOnBean({LoadBalancerClient.class}) @EnableConfigurationProperties({LoadBalancerRetryProperties.class}) public class LoadBalancerAutoConfiguration {@LoadBalanced@Autowired(required = false)private List<RestTemplate> restTemplates = Collections.emptyList();@Autowired(required = false)private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();public LoadBalancerAutoConfiguration() {}@Beanpublic SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {return () -> {restTemplateCustomizers.ifAvailable((customizers) -> {Iterator var2 = this.restTemplates.iterator();while(var2.hasNext()) {RestTemplate restTemplate = (RestTemplate)var2.next();Iterator var4 = customizers.iterator();while(var4.hasNext()) {RestTemplateCustomizer cutomizer = (RestTemplateCustomizer)var4.next();customizer.customize(restTemplate);}}});};}#中間一大段代碼略@Beanpublic LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);}@Bean@ConditionalOnMissingBeanpublic RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {return (restTemplate) -> {List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());list.add(loadBalancerInterceptor);restTemplate.setInterceptors(list);};}} }復(fù)制代碼

@ConditionalOnClass({RestTemplate.class}) : RestTemplate必須位于當(dāng)前的工程環(huán)境中

@ConditionalOnBean({LoadBalancerClient.class}) :工程中必須存在實現(xiàn)LoadBalancerClient的Bean

@LoadBalanced @Autowired(required = false )private List<RestTemplate> restTemplates = Collections.emptyList(); 復(fù)制代碼

private List restTemplates = Collections.emptyList();

維護(hù)一個被@LoadBalanced的修飾的RestTemplate實例列表。

@Beanpublic LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);}復(fù)制代碼

創(chuàng)建一個攔截器 LoadBalancerInterceptor,用于在發(fā)起請求的時候進(jìn)行攔截。

@Bean@ConditionalOnMissingBeanpublic RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {return (restTemplate) -> {List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());list.add(loadBalancerInterceptor);restTemplate.setInterceptors(list);};}} }復(fù)制代碼

為RestTemplate實例列表的請求restTemplate添加一個LoadBalancerInterceptor攔截器。

LoadBalancerInterceptor 類

作用:攔截RestTemplate請求,實現(xiàn)負(fù)載均衡

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {private LoadBalancerClient loadBalancer;private LoadBalancerRequestFactory requestFactory;public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,LoadBalancerRequestFactory requestFactory) {this.loadBalancer = loadBalancer;this.requestFactory = requestFactory;}public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {// for backwards compatibilitythis(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));}@Overridepublic ClientHttpResponse intercept(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) throws IOException {final URI originalUri = request.getURI();String serviceName = originalUri.getHost();Assert.state(serviceName != null,"Request URI does not contain a valid hostname: " + originalUri);return this.loadBalancer.execute(serviceName,this.requestFactory.createRequest(request, body, execution));}}復(fù)制代碼

#當(dāng)一個被@LoadBalanced修飾過的RestTemplate對象發(fā)送請求時,會被 LoadBalancerInterceptor攔截,通過request拿到URL,通過URL拿到服務(wù)名,最后再選擇對應(yīng)的實例發(fā)起請求。

RibbonLoadBalancerClient 類

作用:LoadBalancerClient 接口的具體實現(xiàn)

public class RibbonLoadBalancerClient implements LoadBalancerClient {private SpringClientFactory clientFactory;@Overridepublic <T> T execute(String serviceId, LoadBalancerRequest<T> request)throws IOException {return execute(serviceId, request, null);}public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)throws IOException {ILoadBalancer loadBalancer = getLoadBalancer(serviceId);Server server = getServer(loadBalancer, hint);if (server == null) {throw new IllegalStateException("No instances available for " + serviceId);}RibbonServer ribbonServer = new RibbonServer(serviceId, server,isSecure(server, serviceId),serverIntrospector(serviceId).getMetadata(server));return execute(serviceId, ribbonServer, request);}public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {Server server = null;if (serviceInstance instanceof RibbonLoadBalancerClient.RibbonServer) {server = ((RibbonLoadBalancerClient.RibbonServer)serviceInstance).getServer();}if (server == null) {throw new IllegalStateException("No instances available for " + serviceId);} else {RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);try {T returnVal = request.apply(serviceInstance);statsRecorder.recordStats(returnVal);return returnVal;} catch (IOException var8) {statsRecorder.recordStats(var8);throw var8;} catch (Exception var9) {statsRecorder.recordStats(var9);ReflectionUtils.rethrowRuntimeException(var9);return null;}}}protected Server getServer(ILoadBalancer loadBalancer) {return this.getServer(loadBalancer, (Object)null);}protected Server getServer(ILoadBalancer loadBalancer, Object hint) {return loadBalancer == null ? null : loadBalancer.chooseServer(hint != null ? hint : "default");}} 復(fù)制代碼

注:

到此處代碼和SpringCloud微服務(wù)實戰(zhàn)書中版本的源碼已經(jīng)有了些許不同,實現(xiàn)上更加高效了。

首先通過默認(rèn)的execute實現(xiàn)將參數(shù)傳遞到第二個

public T execute(String serviceId, LoadBalancerRequest request, Object hint)

在第二個方法我們發(fā)現(xiàn)根據(jù)serviceId獲取了對應(yīng)的服務(wù)實例,并且封裝到了RibbonServer對象中。

最終交付到第三個方法

public T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request)

完成具體的執(zhí)行操作。

同時可以發(fā)現(xiàn)getServer的參數(shù)并不是根據(jù)之前的LoadBalancerClient的choose方法,而是使用了Ribbon本身ILoadBalancer接口定義的函數(shù)。

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {if (loadBalancer == null) {return null;}// Use 'default' on a null hint, or just pass it on?return loadBalancer.chooseServer(hint != null ? hint : "default");} 復(fù)制代碼

一探究竟:

ILoadBalancer : 接口

public interface ILoadBalancer {//向負(fù)載均衡器中維護(hù)的服務(wù)列表中添加新的服務(wù)實例public void addServers(List<Server> newServers);//通過某種策略,選擇一個服務(wù)實例public Server chooseServer(Object key);//用來標(biāo)識某個服務(wù)已經(jīng)停止服務(wù)public void markServerDown(Server server);//獲取當(dāng)前服務(wù)器列表。如果availableOnly為true的話,將會返回活躍的服務(wù)列表@Deprecatedpublic List<Server> getServerList(boolean availableOnly);//只返回正在啟動的可返回的服務(wù)列表public List<Server> getReachableServers();//返回所有已知的服務(wù)列表public List<Server> getAllServers(); }復(fù)制代碼

通過查看ILoadBalancer 的具體實現(xiàn)得知

ILoadBalancer -> BaseLoadBalancer(基礎(chǔ)實現(xiàn)) ->DynamicServerListLoadBalancer(擴(kuò)展實現(xiàn))

->ZoneAwareLoadBalancer(擴(kuò)展實現(xiàn))

那Ribbon默認(rèn)使用的哪種實現(xiàn)呢?

@Configuration @EnableConfigurationProperties // Order is important here, last should be the default, first should be optional // see // https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class }) public class RibbonClientConfiguration {@Bean@ConditionalOnMissingBeanpublic ILoadBalancer ribbonLoadBalancer(IClientConfig config,ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,IRule rule, IPing ping, ServerListUpdater serverListUpdater) {if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {return this.propertiesFactory.get(ILoadBalancer.class, config, name);}return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,serverListFilter, serverListUpdater);}} 復(fù)制代碼

通過查看Ribbon的配置類,我們發(fā)現(xiàn)Ribbon默認(rèn)采用的是ZoneAwareLoadBalancer實現(xiàn)

現(xiàn)在回到具體的RibbonLoadBalancerClient 類的execute方法中,可以大概知道Ribbon負(fù)載均衡的一個簡單的流程,即

getServer方法**->**ZoneAwareLoadBalancer的chooseServer方法獲取一個具體的服務(wù)實例

->包裝成一個RibbonServer對象

->LoadBalancerRequest的apply向一個具體的實例發(fā)送一個請求。

ServiceInstance 接口

public interface ServiceInstance {default String getInstanceId() {return null;}String getServiceId();String getHost();int getPort();boolean isSecure();URI getUri();Map<String, String> getMetadata();default String getScheme() {return null;}}復(fù)制代碼

ServiceInstance 的具體實現(xiàn)RibbonServer類

包含了server對象,服務(wù)名,是否使用https等標(biāo)識。

public static class RibbonServer implements ServiceInstance {private final String serviceId;private final Server server;private final boolean secure;private Map<String, String> metadata;public RibbonServer(String serviceId, Server server) {this(serviceId, server, false, Collections.emptyMap());}public RibbonServer(String serviceId, Server server, boolean secure,Map<String, String> metadata) {this.serviceId = serviceId;this.server = server;this.secure = secure;this.metadata = metadata;}@Overridepublic String getInstanceId() {return this.server.getId();}@Overridepublic String getServiceId() {return this.serviceId;}@Overridepublic String getHost() {return this.server.getHost();}@Overridepublic int getPort() {return this.server.getPort();}@Overridepublic boolean isSecure() {return this.secure;}@Overridepublic URI getUri() {return DefaultServiceInstance.getUri(this);}@Overridepublic Map<String, String> getMetadata() {return this.metadata;}public Server getServer() {return this.server;}@Overridepublic String getScheme() {return this.server.getScheme();}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("RibbonServer{");sb.append("serviceId='").append(serviceId).append('\'');sb.append(", server=").append(server);sb.append(", secure=").append(secure);sb.append(", metadata=").append(metadata);sb.append('}');return sb.toString();}}} 復(fù)制代碼

把思路回到LoadBalancerClient接口的apply方法上,然后突然發(fā)現(xiàn),之前SpringCloud微服務(wù)書上的實現(xiàn)早已不同,通過查看接口的實現(xiàn)關(guān)系,發(fā)現(xiàn)最終apply方法是 AsyncLoadBalancerInterceptor類來完成具體的實現(xiàn)的。

AsyncLoadBalancerInterceptor類

public class AsyncLoadBalancerInterceptor implements AsyncClientHttpRequestInterceptor {private LoadBalancerClient loadBalancer;public AsyncLoadBalancerInterceptor(LoadBalancerClient loadBalancer) {this.loadBalancer = loadBalancer;}public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request, final byte[] body, final AsyncClientHttpRequestExecution execution) throws IOException {URI originalUri = request.getURI();String serviceName = originalUri.getHost();return (ListenableFuture)this.loadBalancer.execute(serviceName, new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {public ListenableFuture<ClientHttpResponse> apply(final ServiceInstance instance) throws Exception {HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, AsyncLoadBalancerInterceptor.this.loadBalancer);return execution.executeAsync(serviceRequest, body);}});} } 復(fù)制代碼

由于官方代碼并沒有提供注釋說明這個類的具體作用,通過類名稱大概可以猜出為一個異步的負(fù)載均衡攔截器,攔截Restplate請求,并實現(xiàn)apply方法向一個具體的實例發(fā)送請求。

具體執(zhí)行的代碼

HttpRequest serviceRequest = new ServiceRequestWrapper(request,instance, AsyncLoadBalancerInterceptor.this.loadBalancer);return execution.executeAsync(serviceRequest, body); 復(fù)制代碼

發(fā)現(xiàn)具體實現(xiàn)的時候,還傳入了一個ServiceRequestWrapper對象。

ServiceRequestWrapper類

public class ServiceRequestWrapper extends HttpRequestWrapper {private final ServiceInstance instance;private final LoadBalancerClient loadBalancer;public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,LoadBalancerClient loadBalancer) {super(request);this.instance = instance;this.loadBalancer = loadBalancer;}@Overridepublic URI getURI() {URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());return uri;}復(fù)制代碼

可以發(fā)現(xiàn)這個類繼承了HttpRequestWrapper 類,并且重寫了getURI()方法,同時在 getURI() 方法中,具體采納了RibbonLoadBalancerClient 的reconstructURI方法來組織具體請求的URL實例地址。

@Overridepublic URI reconstructURI(ServiceInstance instance, URI original) {Assert.notNull(instance, "instance can not be null");String serviceId = instance.getServiceId();RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);URI uri;Server server;if (instance instanceof RibbonServer) {RibbonServer ribbonServer = (RibbonServer) instance;server = ribbonServer.getServer();uri = updateToSecureConnectionIfNeeded(original, ribbonServer);}else {server = new Server(instance.getScheme(), instance.getHost(),instance.getPort());IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);ServerIntrospector serverIntrospector = serverIntrospector(serviceId);uri = updateToSecureConnectionIfNeeded(original, clientConfig,serverIntrospector, server);}return context.reconstructURIWithServer(server, uri);} 復(fù)制代碼

而在reconstructURIWithServer方法中,我們可以發(fā)現(xiàn)這樣一個執(zhí)行邏輯,首先從Server對象中獲得Host和port信息,然后從URI original對象中,獲取其他的請求信息,最終拼接成要訪問的具體的實例地址。

public URI reconstructURIWithServer(Server server, URI original) {String host = server.getHost();int port = server.getPort();String scheme = server.getScheme();if (host.equals(original.getHost()) && port == original.getPort()&& scheme == original.getScheme()) {return original;}if (scheme == null) {scheme = original.getScheme();}if (scheme == null) {scheme = deriveSchemeAndPortFromPartialUri(original).first();}try {StringBuilder sb = new StringBuilder();sb.append(scheme).append("://");if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {sb.append(original.getRawUserInfo()).append("@");}sb.append(host);if (port >= 0) {sb.append(":").append(port);}sb.append(original.getRawPath());if (!Strings.isNullOrEmpty(original.getRawQuery())) {sb.append("?").append(original.getRawQuery());}if (!Strings.isNullOrEmpty(original.getRawFragment())) {sb.append("#").append(original.getRawFragment());}URI newURI = new URI(sb.toString());return newURI; } catch (URISyntaxException e) {throw new RuntimeException(e);}} 復(fù)制代碼

負(fù)載均衡器

AbstractLoadBalancer 類

import java.util.List;public abstract class AbstractLoadBalancer implements ILoadBalancer {//一個關(guān)于服務(wù)實例的分組枚舉類,定義了三種不同的級別public enum ServerGroup{ALL,STATUS_UP,STATUS_NOT_UP }/*** 選擇一個服務(wù)實例,key為null,忽略key的條件判斷*/public Server chooseServer() {return chooseServer(null);}/*** 根據(jù)不同的分組類型來選擇返回不同的服務(wù)實例的列表*/public abstract List<Server> getServerList(ServerGroup serverGroup);/*** 獲取與負(fù)載均衡器相關(guān)的統(tǒng)計信息*/public abstract LoadBalancerStats getLoadBalancerStats(); }復(fù)制代碼

AbstractLoadBalancer是 ILoadBalancer的一個抽象實現(xiàn),同時也維護(hù)了一個關(guān)于服務(wù)實例的分組枚舉類,ServerGroup 同時呢,定義了三種類型,用來針對不同的情況。

  • ALL :所有服務(wù)實例
  • STATUS_UP :正常服務(wù)的實例
  • STATUS_NOT_UP :停止服務(wù)的實例

BaseLoadBalancer類

作用:負(fù)載均衡的基礎(chǔ)負(fù)載均衡器,定義了很多負(fù)載均衡器的基本內(nèi)容

接下來看BaseLoadBalancer針對負(fù)載均衡都做了哪些工作呢?

  • 維護(hù)了兩個服務(wù)實例列表,其中一個用于存放所有的實例,一個用于存放正常服務(wù)的實例
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>()); @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>()); 復(fù)制代碼
  • 定義了服務(wù)檢查的IPing對象,默認(rèn)為null

    protected IPing ping = null; 復(fù)制代碼
  • 定義了實施服務(wù)檢查的執(zhí)行策略對象,采用默認(rèn)策略實現(xiàn)。

    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY 復(fù)制代碼

    源碼部分:

    /*** Default implementation for <c>IPingStrategy</c>, performs ping* serially, which may not be desirable, if your <c>IPing</c>* implementation is slow, or you have large number of servers.*/ private static class SerialPingStrategy implements IPingStrategy {@Overridepublic boolean[] pingServers(IPing ping, Server[] servers) {int numCandidates = servers.length;boolean[] results = new boolean[numCandidates];logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);for (int i = 0; i < numCandidates; i++) {results[i] = false; /* Default answer is DEAD. */try {// NOTE: IFF we were doing a real ping// assuming we had a large set of servers (say 15)// the logic below will run them serially// hence taking 15 times the amount of time it takes// to ping each server// A better method would be to put this in an executor// pool// But, at the time of this writing, we dont REALLY// use a Real Ping (its mostly in memory eureka call)// hence we can afford to simplify this design and run// this// seriallyif (ping != null) {results[i] = ping.isAlive(servers[i]);}} catch (Exception e) {logger.error("Exception while pinging Server: '{}'", servers[i], e);}}return results;} } 復(fù)制代碼

    根據(jù)注釋的意思我們大概知道,如果Server列表過大時,采用默認(rèn)線性遍歷的方式可能會影響系統(tǒng)的性能,

    這個時候就需要 實現(xiàn) IPingStrategy 并重寫 pingServers 采用更為靈活的方式。

  • 定義了服務(wù)選擇器IRule對象,這里默認(rèn)采用RoundRobinRule實現(xiàn)

    RoundRobinRule代碼部分:

    public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {log.warn("no load balancer");return null;}Server server = null;int count = 0;while (server == null && count++ < 10) {List<Server> reachableServers = lb.getReachableServers();List<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();if ((upCount == 0) || (serverCount == 0)) {log.warn("No up servers available from load balancer: " + lb);return null;}int nextServerIndex = incrementAndGetModulo(serverCount);server = allServers.get(nextServerIndex);if (server == null) {/* Transient. */Thread.yield();continue;}if (server.isAlive() && (server.isReadyToServe())) {return (server);}// Next.server = null;}if (count >= 10) {log.warn("No available alive servers after 10 tries from load balancer: "+ lb);}return server; } 復(fù)制代碼

    這里可以看出Ribbon默認(rèn)的服務(wù)選擇策略是線性選擇策略。

    舉個例子:第一次請求分發(fā)到了 9090 端口 第二次則會分發(fā)到 9091 然后 9092這樣來

  • 啟動Ping服務(wù),定時檢查當(dāng)前Server是否健康,默認(rèn)10秒

    protected int pingIntervalSeconds = 10; 復(fù)制代碼
  • 實現(xiàn)了ILoadBalancer的一系列操作

    //向服務(wù)列表中添加一個新的服務(wù) @Overridepublic void addServers(List<Server> newServers) {if (newServers != null && newServers.size() > 0) {try {ArrayList<Server> newList = new ArrayList<Server>();newList.addAll(allServerList);newList.addAll(newServers);setServersList(newList);} catch (Exception e) {logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);}}}//根據(jù)特定的key選擇一個服務(wù)實例 public Server chooseServer(Object key) {if (counter == null) {counter = createCounter();}counter.increment();if (rule == null) {return null;} else {try {return rule.choose(key);} catch (Exception e) {logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);return null;}}}//返回一個服務(wù)列表@Overridepublic List<Server> getServerList(boolean availableOnly) {return (availableOnly ? getReachableServers() : getAllServers());}//返回可用的實例列表@Overridepublic List<Server> getReachableServers() {return Collections.unmodifiableList(upServerList);}//返回所有的實例列表@Overridepublic List<Server> getAllServers() {return Collections.unmodifiableList(allServerList);}//標(biāo)記一個服務(wù)暫停服務(wù)public void markServerDown(Server server) {if (server == null || !server.isAlive()) {return;}logger.error("LoadBalancer [{}]: markServerDown called on [{}]", name, server.getId());server.setAlive(false);// forceQuickPing();notifyServerStatusChangeListener(singleton(server));}復(fù)制代碼

DynamicServerListLoadBalancer類

作用:對基礎(chǔ)的負(fù)載均衡器BaseLoadBalancer做了擴(kuò)展,使其擁有服務(wù)實例清單在運行期的動態(tài)更新的能力。同時也具備了對服務(wù)實例清單的過濾功能。

在DynamicServerListLoadBalancer類的成員定義中,我們發(fā)現(xiàn)新增了一個成員

ServerList serverListImpl 對象,源碼如下:

public interface ServerList<T extends Server> {//獲取初始化時的服務(wù)列表public List<T> getInitialListOfServers();/***獲取更新時的服務(wù)列表*/public List<T> getUpdatedListOfServers(); } 復(fù)制代碼

通過查看ServerList的繼承關(guān)系圖,我們發(fā)現(xiàn)ServerList接口的實現(xiàn)不止一個,那 具體是使用了哪一個實現(xiàn)呢?

可以從如下思路入手,既然DynamicServerListLoadBalancer類實現(xiàn)了服務(wù)實例清單的動態(tài)更新,那Ribbon勢必要和Eureka整合,所以我們從Eureka對Ribbon的支持下手。

EurekaRibbonClientConfiguration類:

@Bean @ConditionalOnMissingBean public ServerList<?> ribbonServerList(IClientConfig config,Provider<EurekaClient> eurekaClientProvider) {if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {return this.propertiesFactory.get(ServerList.class, config, serviceId);}DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider);DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname);return serverList; } 復(fù)制代碼

可以看到默認(rèn)采用的DiscoveryEnabledNIWSServerList 實現(xiàn)。

DomainExtractingServerList類:

public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {private ServerList<DiscoveryEnabledServer> list;private final RibbonProperties ribbon;private boolean approximateZoneFromHostname;public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,IClientConfig clientConfig, boolean approximateZoneFromHostname) {this.list = list;this.ribbon = RibbonProperties.from(clientConfig);this.approximateZoneFromHostname = approximateZoneFromHostname;}@Overridepublic List<DiscoveryEnabledServer> getInitialListOfServers() {List<DiscoveryEnabledServer> servers = setZones(this.list.getInitialListOfServers());return servers;}@Overridepublic List<DiscoveryEnabledServer> getUpdatedListOfServers() {List<DiscoveryEnabledServer> servers = setZones(this.list.getUpdatedListOfServers());return servers;}private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {List<DiscoveryEnabledServer> result = new ArrayList<>();boolean isSecure = this.ribbon.isSecure(true);boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();for (DiscoveryEnabledServer server : servers) {result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,this.approximateZoneFromHostname));}return result;}}...略}復(fù)制代碼

可以看到DomainExtractingServerList的具體實現(xiàn)是委托于其內(nèi)部list來實現(xiàn)的,內(nèi)部list通過DomainExtractingServerList構(gòu)造器傳入的DiscoveryEnabledNIWSServerList獲得。

DiscoveryEnabledNIWSServerList 類:

源碼部分:(部分代碼略)

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer> {public List<DiscoveryEnabledServer> getInitialListOfServers() {return this.obtainServersViaDiscovery();}public List<DiscoveryEnabledServer> getUpdatedListOfServers() {return this.obtainServersViaDiscovery();}rivate List<DiscoveryEnabledServer> obtainServersViaDiscovery() {List<DiscoveryEnabledServer> serverList = new ArrayList();if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();if (this.vipAddresses != null) {String[] var3 = this.vipAddresses.split(",");int var4 = var3.length;for(int var5 = 0; var5 < var4; ++var5) {String vipAddress = var3[var5];List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);Iterator var8 = listOfInstanceInfo.iterator();while(var8.hasNext()) {InstanceInfo ii = (InstanceInfo)var8.next();if (ii.getStatus().equals(InstanceStatus.UP)) {if (this.shouldUseOverridePort) {if (logger.isDebugEnabled()) {logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);}InstanceInfo copy = new InstanceInfo(ii);if (this.isSecure) {ii = (new Builder(copy)).setSecurePort(this.overridePort).build();} else {ii = (new Builder(copy)).setPort(this.overridePort).build();}}DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);serverList.add(des);}}if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) {break;}}}return serverList;} else {logger.warn("EurekaClient has not been initialized yet, returning an empty list");return new ArrayList();}}} 復(fù)制代碼
  • 第一步,通過eureka獲取服務(wù)實例listOfInstanceInfo列表

    List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion) 復(fù)制代碼
  • 第二步,遍歷listOfInstanceInfo列表,如果該服務(wù)實例狀態(tài)為UP,則轉(zhuǎn)化成DiscoveryEnabledServer對象,然后添加到serverList里面。

  • 返回serverList服務(wù)實例列表。

通過查看上面的代碼大概知道了Ribbon是如何從Eureka注冊中心獲取最新的服務(wù)列表的,那Ribbon又是如何將獲取到的服務(wù)列表更新到本地的呢,這一切的關(guān)鍵是在DynamicServerListLoadBalancer類上,因為我們知道DynamicServerListLoadBalancer類具體實現(xiàn)了動態(tài)更新服務(wù)列表的功能。

通過查看源碼:

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {//更新的具體實現(xiàn)@Overridepublic void doUpdate() {updateListOfServers();}}; 復(fù)制代碼public interface ServerListUpdater {/*** an interface for the updateAction that actually executes a server list update*/public interface UpdateAction {void doUpdate();}/*** start the serverList updater with the given update action* This call should be idempotent.* 啟動服務(wù)更新器** @param updateAction*/void start(UpdateAction updateAction);/*** stop the serverList updater. This call should be idempotent*停止服務(wù)更新器*/void stop();/*** @return the last update timestamp as a {@link java.util.Date} string*獲取最近一次更新的時間*/String getLastUpdate();/*** @return the number of ms that has elapsed since last update* 獲取上一次更新到現(xiàn)在的時間間隔,單位為Ms毫秒*/long getDurationSinceLastUpdateMs();/*** @return the number of update cycles missed, if valid*/int getNumberMissedCycles();/*** @return the number of threads used, if vaid* 獲取核心線程數(shù)*/int getCoreThreads(); }復(fù)制代碼

通過查看ServerListUpdater 接口實現(xiàn)關(guān)系圖,我們大概發(fā)現(xiàn)Ribbon內(nèi)置了兩個實現(xiàn)。

  • PollingServerListUpdater :默認(rèn)采用的更新策略,采用定時任務(wù)的方式動態(tài)更新服務(wù)列表

    // msecs; 延遲一秒開始執(zhí)行 private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;以30秒為周期重復(fù)執(zhí)行 private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; 復(fù)制代碼
  • EurekaNotificationServerListUpdater :基于Eureka事件機(jī)制來驅(qū)動服務(wù)列表更新的實現(xiàn)。

那么,我們Ribbon默認(rèn)具體采用了哪一種更新策略呢,通過查看DynamicServerListLoadBalancer類的代碼,我們發(fā)現(xiàn)Ribbon采用的默認(rèn)服務(wù)更新器是PollingServerListUpdater

@Deprecatedpublic DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter) {this(clientConfig,rule,ping,serverList,filter,new PollingServerListUpdater());} 復(fù)制代碼

既然了解了默認(rèn)更新策略,那么我們再次回到我們的主角DynamicServerListLoadBalancer類上。

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {@Overridepublic void doUpdate() {updateListOfServers();}}; 復(fù)制代碼

通過代碼我們發(fā)現(xiàn)實際履行更新職責(zé)的方法是 updateListOfServers() ,不廢話,上代碼:

@VisibleForTestingpublic void updateListOfServers() {List<T> servers = new ArrayList<T>();if (serverListImpl != null) {servers = serverListImpl.getUpdatedListOfServers();LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);if (filter != null) {servers = filter.getFilteredListOfServers(servers);LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);}}updateAllServerList(servers);} 復(fù)制代碼

通過查看代碼,我們發(fā)現(xiàn)流程大致如下:

  • 通過 ServerList的getUpdatedListOfServers() 方法獲取到最新的服務(wù)實例列表
  • 如果之前定義了過濾器,則按照某種規(guī)則實施過濾,最后返回
  • updateAllServerList(servers); 完成最后的更新操作。
public interface ServerListFilter<T extends Server> {public List<T> getFilteredListOfServers(List<T> servers);}復(fù)制代碼

通過查看繼承實現(xiàn)關(guān)系圖,發(fā)現(xiàn)ServerListFilter的直接實現(xiàn)類為:AbstractServerListFilter

其中ZoneAffinityServerListFilter 繼承了 AbstractServerListFilter ,然后得ZoneAffinityServerListFilter 真?zhèn)鞯淖宇愑钟泻枚?#xff0c;這里著重介紹AbstractServerListFilterZoneAffinityServerListFilter 實現(xiàn)

  • AbstractServerListFilter :抽象過濾器,依賴LoadBalancerStats對象實現(xiàn)過濾。LoadBalancerStats存儲了負(fù)載均衡器的一些屬性和統(tǒng)計信息。
  • ZoneAffinityServerListFilter:此服務(wù)器列表篩選器處理基于區(qū)域關(guān)聯(lián)性篩選服務(wù)器。它會過濾掉一些服務(wù)實例和消費者不在一個Zone(區(qū)域)的實例。

ZoneAwareLoadBalancer類

功能:ZoneAwareLoadBalancer負(fù)載均衡器是對DynamicServerListLoadBalancer類的擴(kuò)展和補(bǔ)充,該負(fù)載混合器實現(xiàn)了Zone(區(qū)域)的概念,避免了因為跨區(qū)域而導(dǎo)致的區(qū)域性故障,從而實現(xiàn)了服務(wù)的高可用。

那么ZoneAwareLoadBalancer具體做了哪些工作來實現(xiàn)這些功能的呢?

第一:重寫了DynamicServerListLoadBalancer的setServerListForZones方法:

原版:

protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {LOGGER.debug("Setting server list for zones: {}", zoneServersMap);getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);} 復(fù)制代碼

ZoneAwareLoadBalancer類版:

@Overrideprotected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {super.setServerListForZones(zoneServersMap);if (balancers == null) {//balancers 用來存儲每個String對應(yīng)的Zonebalancers = new ConcurrentHashMap<String, BaseLoadBalancer>();}//設(shè)置對應(yīng)zone下面的實例清單for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {String zone = entry.getKey().toLowerCase();getLoadBalancer(zone).setServersList(entry.getValue());}//檢查是否有不再擁有服務(wù)器的區(qū)域//并將列表設(shè)置為空,以便與區(qū)域相關(guān)的度量不為空//包含過時的數(shù)據(jù)// 防止因為Zone的信息過時而干擾具體實例的選擇算法。for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {existingLBEntry.getValue().setServersList(Collections.emptyList());}}} 復(fù)制代碼

那ZoneAwareLoadBalancer類是具體如何來選擇具體的服務(wù)實例呢,

@Overridepublic Server chooseServer(Object key) {if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {logger.debug("Zone aware logic disabled or there is only one zone");return super.chooseServer(key);}Server server = null;try {LoadBalancerStats lbStats = getLoadBalancerStats();//為所有Zone都創(chuàng)建一個快照Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);logger.debug("Zone snapshots: {}", zoneSnapshot);if (triggeringLoad == null) {triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);}if (triggeringBlackoutPercentage == null) {triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);}Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());logger.debug("Available zones: {}", availableZones);if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);logger.debug("Zone chosen: {}", zone);if (zone != null) {BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);server = zoneLoadBalancer.chooseServer(key);}}} catch (Exception e) {logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);}if (server != null) {return server;} else {logger.debug("Zone avoidance logic is not invoked.");return super.chooseServer(key);}} 復(fù)制代碼

從源碼中可以看出來, getLoadBalancerStats().getAvailableZones().size() <= 1 只有在當(dāng)前的Zone區(qū)域的數(shù)量大于1的時候才會采用區(qū)域選擇策略,否則的話,則'return super.chooseServer(key)' 什么也不做,采用父類的實現(xiàn)。

在選擇具體的服務(wù)實例中,ZoneAwareLoadBalancer主要做了以下幾件事:

  • 為所有Zone區(qū)域分別創(chuàng)建一個快照,存儲在zoneSnapshot 里面

  • 通過Zone快照中的信息,按照某種策略例如Zone的服務(wù)實例數(shù)量,故障率等等來篩選掉不符合條件的Zone區(qū)域。

  • 如果發(fā)現(xiàn)沒有符合剔除要求的區(qū)域,同時實例最大平均負(fù)載小于閾值(默認(rèn)百分之20),就直接返回所有可以的Zone區(qū)域,否則,隨機(jī)剔除一個最壞的Zone。

  • 獲得的可用的Zone列表不為空,并且數(shù)量小于之前快照中的總數(shù)量,則根據(jù)IRule規(guī)則隨機(jī)選一個Zone區(qū)域

    if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);logger.debug("Zone chosen: {}", zone);if (zone != null) {BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);server = zoneLoadBalancer.chooseServer(key);}} 復(fù)制代碼
  • 確定了最終的Zone之后,最終調(diào)用 BaseLoadBalancer的chooseServer來選擇一個合適的服務(wù)實例。

負(fù)載均衡策略

通過上面的分析,我們發(fā)現(xiàn)當(dāng)一個請求過來時,會被攔截交給相應(yīng)的負(fù)載均衡器,然后不同的負(fù)載均衡器根據(jù)不同的策略來選擇合適的服務(wù)實例。在這里我們是知道Ribbon是根據(jù)不同的Rule來實現(xiàn)對實例的一個選擇的,那么Ribbon具體提供了哪些規(guī)則供我們使用呢?通過查看Ribbon的IRule接口的實現(xiàn)集成關(guān)系圖,我們最終可以發(fā)現(xiàn),Ribbon主要提供了以下幾個規(guī)則實現(xiàn)的。

  • RandomRule 類:該策略實現(xiàn)了從服務(wù)實例清單中隨機(jī)選擇一個服務(wù)實例的功能

  • RoundRobinRule類:該策略實現(xiàn)了輪詢的方式從服務(wù)實例清單中依次選擇服務(wù)實例的功能RetryRule

  • RetryRule類:該策略實現(xiàn)了具備重試機(jī)制的實例選擇功能

  • WeightedResponseTimeRule類:根據(jù)權(quán)重來選擇實例

  • BestAvailableRule類:選擇一個最空閑的實例

  • PredicateBasedRule 類:先過濾,然后再以輪詢的方式選擇實例

    ...

IRule接口:

public interface IRule{public Server choose(Object key);public void setLoadBalancer(ILoadBalancer lb);public ILoadBalancer getLoadBalancer(); }復(fù)制代碼

AbstractLoadBalancerRule抽象類:

public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {private ILoadBalancer lb;@Overridepublic void setLoadBalancer(ILoadBalancer lb){this.lb = lb;}@Overridepublic ILoadBalancer getLoadBalancer(){return lb;} } 復(fù)制代碼

RandomRule類

功能:該策略實現(xiàn)了從服務(wù)實例清單中隨機(jī)選擇一個服務(wù)實例的功能。

查看代碼發(fā)現(xiàn)具體的實例選擇并沒有由默認(rèn)的choose(Object key)來實現(xiàn),而是委托給了同類下的choose(ILoadBalancer lb, Object key)方法來完成實際的實例選擇工作。

public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {return null;}Server server = null;while (server == null) {if (Thread.interrupted()) {return null;}List<Server> upList = lb.getReachableServers();List<Server> allList = lb.getAllServers();int serverCount = allList.size();if (serverCount == 0) {/** No servers. End regardless of pass, because subsequent passes* only get more restrictive.*/return null;}int index = chooseRandomInt(serverCount);server = upList.get(index);if (server == null) {/** The only time this should happen is if the server list were* somehow trimmed. This is a transient condition. Retry after* yielding.*/Thread.yield();continue;}if (server.isAlive()) {return (server);}// Shouldn't actually happen.. but must be transient or a bug.server = null;Thread.yield();}return server;}復(fù)制代碼

注:如果獲取不到服務(wù)實例,則可能存在并發(fā)的bug

RoundRobinRule類

功能:該策略實現(xiàn)了輪詢的方式從服務(wù)實例清單中依次選擇服務(wù)實例的功能

public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {log.warn("no load balancer");return null;}Server server = null;int count = 0;while (server == null && count++ < 10) {//reachableServers 可用的服務(wù)實例清單List<Server> reachableServers = lb.getReachableServers();//allServers 獲取所有可用的服務(wù)列表List<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();if ((upCount == 0) || (serverCount == 0)) {log.warn("No up servers available from load balancer: " + lb);return null;}int nextServerIndex = incrementAndGetModulo(serverCount);server = allServers.get(nextServerIndex);if (server == null) {/* Transient. */Thread.yield();continue;}if (server.isAlive() && (server.isReadyToServe())) {return (server);}// Next.server = null;}if (count >= 10) {log.warn("No available alive servers after 10 tries from load balancer: "+ lb);}return server;}private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextServerCyclicCounter.get();int next = (current + 1) % modulo;if (nextServerCyclicCounter.compareAndSet(current, next))return next;}}復(fù)制代碼

源碼分析:可以發(fā)現(xiàn)RoundRobinRule的實現(xiàn)邏輯和RandomRule非常類似,我們可以看出來,RoundRobinRule定義了一個計數(shù)器變量count,該計數(shù)器會在每次循環(huán)后自動疊加,當(dāng)獲取不到Server的次數(shù)超過十次時,會結(jié)束嘗試,并發(fā)出警告:No available alive servers after 10 tries from load balancer。

而線性輪詢的實現(xiàn)則是通過 incrementAndGetModulo(int modulo)來實現(xiàn)的.

RetryRule類:

功能:該策略實現(xiàn)了具備重試機(jī)制的實例選擇功能

public Server choose(ILoadBalancer lb, Object key) {//請求時間long requestTime = System.currentTimeMillis();//deadline 截止期限long deadline = requestTime + maxRetryMillis;Server answer = null;answer = subRule.choose(key);if (((answer == null) || (!answer.isAlive()))&& (System.currentTimeMillis() < deadline)) {InterruptTask task = new InterruptTask(deadline- System.currentTimeMillis());while (!Thread.interrupted()) {answer = subRule.choose(key);if (((answer == null) || (!answer.isAlive()))&& (System.currentTimeMillis() < deadline)) {/* pause and retry hoping it's transient */Thread.yield();} else {break;}}task.cancel();}if ((answer == null) || (!answer.isAlive())) {return null;} else {return answer;}}復(fù)制代碼

默認(rèn)使用的是RoundRobinRule策略。期間如果能選擇到實例就返回,如果選擇不到就根據(jù)設(shè)置的嘗試結(jié)束時間為閾值,如果超過截止期限則直接返回null。

WeightedResponseTimeRule類

功能:根據(jù)權(quán)重來選擇實例

主要有以下三個核心內(nèi)容:

  • 定時任務(wù)
  • 權(quán)重計算
  • 實例選擇

1. 定時任務(wù)

void initialize(ILoadBalancer lb) { if (serverWeightTimer != null) {serverWeightTimer.cancel();}serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"+ name, true);//啟動定時任務(wù)serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,serverWeightTaskTimerInterval);// do a initial runServerWeight sw = new ServerWeight();sw.maintainWeights();Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {public void run() {logger.info("Stopping NFLoadBalancer-serverWeightTimer-"+ name);serverWeightTimer.cancel();}}));}復(fù)制代碼

WeightedResponseTimeRule類在初始化的時候會先定義一個計時器,然后會啟動一個定時任務(wù),用來為每個服務(wù)實例計算權(quán)重,該任務(wù)默認(rèn)每30秒執(zhí)行一次。

class DynamicServerWeightTask extends TimerTask {public void run() {ServerWeight serverWeight = new ServerWeight();try {serverWeight.maintainWeights();} catch (Exception e) {logger.error("Error running DynamicServerWeightTask for {}", name, e);}}} 復(fù)制代碼

2.權(quán)重計算

通過上面的DynamicServerWeightTask的代碼呢,我們可以大致了解到,權(quán)重計算的功能呢實際是由ServerWeight的maintainWeights()來執(zhí)行的。少廢話,上代碼。

public void maintainWeights() {ILoadBalancer lb = getLoadBalancer();if (lb == null) {return;}if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {return; }try {logger.info("Weight adjusting job started");AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;LoadBalancerStats stats = nlb.getLoadBalancerStats();if (stats == null) {// no statistics, nothing to doreturn;}double totalResponseTime = 0;// find maximal 95% response timefor (Server server : nlb.getAllServers()) {// this will automatically load the stats if not in cacheServerStats ss = stats.getSingleServerStat(server);totalResponseTime += ss.getResponseTimeAvg();}// weight for each server is (sum of responseTime of all servers - responseTime)// so that the longer the response time, the less the weight and the less likely to be chosenDouble weightSoFar = 0.0;// create new list and hot swap the referenceList<Double> finalWeights = new ArrayList<Double>();for (Server server : nlb.getAllServers()) {ServerStats ss = stats.getSingleServerStat(server);double weight = totalResponseTime - ss.getResponseTimeAvg();weightSoFar += weight;finalWeights.add(weightSoFar); }setWeights(finalWeights);} catch (Exception e) {logger.error("Error calculating server weights", e);} finally {serverWeightAssignmentInProgress.set(false);}}}復(fù)制代碼

那WeightedResponseTimeRule是如何計算權(quán)重的呢?主要分為以下兩步:

  • 先遍歷服務(wù)器列表,并得到每個服務(wù)器的平均響應(yīng)時間,遍歷過程中對其求和,遍歷結(jié)束后得到總響應(yīng)時間totalResponseTime。
  • 再一次遍歷服務(wù)器列表,并將總響應(yīng)時間totalResponseTime減去每個服務(wù)器的平均響應(yīng)時間作為權(quán)重weight,再將這之前的所以權(quán)重累加到weightSoFar 變量中,并且保存到finalWeights供choose使用。
  • 3.實例選擇

    public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {return null;}Server server = null;while (server == null) {//獲取當(dāng)前引用,以防它被其他線程更改List<Double> currentWeights = accumulatedWeights;if (Thread.interrupted()) {return null;}List<Server> allList = lb.getAllServers();int serverCount = allList.size();if (serverCount == 0) {return null;}int serverIndex = 0;// 列表中的最后一個是所有權(quán)重的和double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); //尚未命中任何服務(wù)器,且未初始化總重量//使用循環(huán)操作if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {server = super.choose(getLoadBalancer(), key);if(server == null) {return server;}} else {//生成一個從0(含)到maxTotalWeight(不含)之間的隨機(jī)權(quán)重double randomWeight = random.nextDouble() * maxTotalWeight;//根據(jù)隨機(jī)索引選擇服務(wù)器索引int n = 0;for (Double d : currentWeights) {if (d >= randomWeight) {serverIndex = n;break;} else {n++;}}server = allList.get(serverIndex);}if (server == null) {/* Transient. */Thread.yield();continue;}if (server.isAlive()) {return (server);}// Next.server = null;}return server;} 復(fù)制代碼

    執(zhí)行步驟:

    • 生成一個從0(含)到maxTotalWeight(不含)之間的隨機(jī)權(quán)重
    • 遍歷權(quán)重列表,比較權(quán)重值與隨機(jī)數(shù)的大小,如果權(quán)重值大于等于隨機(jī)數(shù),就當(dāng)前權(quán)重列表的索引值去服務(wù)實例列表中列表中獲取具體的實例。

    BestAvailableRule類

    功能:選擇一個最空閑的實例

    @Overridepublic Server choose(Object key) {if (loadBalancerStats == null) {return super.choose(key);}List<Server> serverList = getLoadBalancer().getAllServers();//minimalConcurrentConnections:最小并發(fā)連接數(shù)int minimalConcurrentConnections = Integer.MAX_VALUE;long currentTime = System.currentTimeMillis();Server chosen = null;for (Server server: serverList) {ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);if (!serverStats.isCircuitBreakerTripped(currentTime)) {//concurrentConnections:并發(fā)連接數(shù)int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);if (concurrentConnections < minimalConcurrentConnections) {minimalConcurrentConnections = concurrentConnections;chosen = server;}}}if (chosen == null) {return super.choose(key);} else {return chosen;}} 復(fù)制代碼

    通過查看源碼可以得知BestAvailableRule大致采用了如下策略來選擇服務(wù)實例,根據(jù)loadBalancerStats中的統(tǒng)計信息通過遍歷負(fù)載均衡器維護(hù)的所有服務(wù)實例 選出并發(fā)連接數(shù)最少的那一個,即最空閑的實例。

    如果loadBalancerStats為空的話,則直接調(diào)用父類ClientConfigEnabledRoundRobinRule的實現(xiàn),即RoundRobinRule,線性輪詢的方式。

    PredicateBasedRule 類

    功能:先過濾,然后再以輪詢的方式選擇實例

    @Overridepublic Server choose(Object key) {ILoadBalancer lb = getLoadBalancer();Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);if (server.isPresent()) {return server.get();} else {return null;} } 復(fù)制代碼

    實現(xiàn)邏輯:通過子類中實現(xiàn)的predicate邏輯來過濾一部分服務(wù)實例,然后再以線性輪詢的方式從過濾之后的服務(wù)實例清單中選擇一個。

    當(dāng)然,PredicateBasedRule本身是一個抽象類,必然Ribbon提供了相應(yīng)的子類實現(xiàn),我們看到有ZoneAvoidanceRule和AvailabilityFilteringRule,分別對PredicateBasedRule做了相應(yīng)的擴(kuò)展,有興趣的小伙伴可以下去自行研究。

    配置詳解:

    自動化配置:

    同樣,得益于Springboot的自動化配置,大大降低了開發(fā)者上手的難度,在引入Spring-Clould-Ribbon依賴之后,便能夠自動構(gòu)建下面這些接口的實現(xiàn)。

    • IClientConfig:Ribbon客戶端配置接口類,默認(rèn)實現(xiàn):com.netflix.client.config.DefaultClientConfigImpl
    • IRule: Ribbon:服務(wù)實例選擇策略接口類,默認(rèn)采用的實現(xiàn):com.netflix.loadbalancer.ZoneAvoidanceRule
    • IPing:Ribbon:實例檢查策略接口類,默認(rèn)實現(xiàn):NoOpPing 即不檢查
    • ServerList:服務(wù)實例清單維護(hù)機(jī)制接口類,默認(rèn)實現(xiàn)ConfigurationBasedServerList 當(dāng)整合Eureka的情況下,則使用DiscoveryEnabledNIWSServerList類
    • ServerListFilter:服務(wù)實例過濾策略接口類,默認(rèn)實現(xiàn):ZoneAffinityServerListFilter 根據(jù)區(qū)域過濾,
    • ILoadBalancer:負(fù)載均衡器接口類,默認(rèn)實現(xiàn):ZoneAwareLoadBalancer 具備區(qū)域感知

    替換默認(rèn)配置

    Ribbon同時支持部分默認(rèn)配置的替換,這為使用針對不同場景的定制化方案提供了可能。目前的話支持兩種方式的替換(我只知道這兩種)。

    • 創(chuàng)建實例覆蓋默認(rèn)實現(xiàn)

    • 配置文件配置

    創(chuàng)建實例覆蓋默認(rèn)實現(xiàn)

    例:將默認(rèn)的負(fù)載均衡策略替換成自己自定義的策略。

    @Beanpublic IRule myRule() {return new MyRule();} 復(fù)制代碼

    配置文件配置

    通過使用.ribbon. = value 方式

    在application.properties中添加如下代碼,即可以將默認(rèn)的IPing策略替換成自己自定義的策略。

    ### 擴(kuò)展 IPing 實現(xiàn) user-service-provider.ribbon.NFLoadBalancerPingClassName = \com.xxxx.demo.user.ribbon.client.ping.MyPing 復(fù)制代碼

    MyPing代碼(小馬哥微服務(wù)實戰(zhàn)版):

    public class MyPing implements IPing {@Overridepublic boolean isAlive(Server server) {String host = server.getHost();int port = server.getPort();// /health endpoint// 通過 Spring 組件來實現(xiàn)URL 拼裝UriComponentsBuilder builder = UriComponentsBuilder.newInstance();builder.scheme("http");builder.host(host);builder.port(port);builder.path("/actuator/health");URI uri = builder.build().toUri();RestTemplate restTemplate = new RestTemplate();ResponseEntity responseEntity = restTemplate.getForEntity(uri, String.class);// 當(dāng)響應(yīng)狀態(tài)等于 200 時,返回 true ,否則 falsereturn HttpStatus.OK.equals(responseEntity.getStatusCode());}}復(fù)制代碼

    MyRule代碼(小馬哥微服務(wù)實戰(zhàn)版):

    public class MyRule extends AbstractLoadBalancerRule {@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {}@Overridepublic Server choose(Object key) {ILoadBalancer loadBalancer = getLoadBalancer();//獲取所有可達(dá)服務(wù)器列表List<Server> servers = loadBalancer.getReachableServers();if (servers.isEmpty()) {return null;}// 永遠(yuǎn)選擇最后一臺可達(dá)服務(wù)器Server targetServer = servers.get(servers.size() - 1);return targetServer;}}復(fù)制代碼

    總結(jié):

    通過本次對Ribbon源碼的一個簡單初探,慢慢明白一個優(yōu)秀的框架的優(yōu)秀之處了,再看看自己之前寫的代碼就有些難以直視了,一個框架的設(shè)計往往不僅僅是實現(xiàn)了某些功能,也同時考慮到了各種不同的使用場景,這樣可以保證框架可以勝任大多數(shù)簡單的項目和大型項目。同時框架內(nèi)部有很多實現(xiàn)都很高效,很少出現(xiàn)有什么極度不合理的地方,同時代碼復(fù)用性也很高,看似幾十上百個類實則職責(zé)分明,井井有條,在保證功能的情況下同時又有良好的擴(kuò)展性。因為平常學(xué)業(yè)繁忙(主要是懶還愛玩兒),刻苦學(xué)習(xí)(期末全靠水過去),所以Ribbon這篇磕磕絆絆寫了有半個多月的時間。好在自己終于堅持把它給看完了。后面的打算呢,將會陸續(xù)把自己學(xué)習(xí)java微服務(wù)的筆記整理好開源至本人的github上,希望可以幫助到一些剛開始入門的小伙伴們,也騙一些star(滑稽),最后,我是韓數(shù),計算機(jī)小白,本科在讀,我喜歡唱,跳...

    轉(zhuǎn)載于:https://juejin.im/post/5d09c072f265da1b7c611c82

    與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖

    總結(jié)

    以上是生活随笔為你收集整理的万字长文浅析SpringCould微服务负载均衡框架Ribbon源码(字多慎入)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 久久婷婷国产麻豆91天堂 | 西西人体www大胆高清 | 91久久久久久久久久久久 | 中文字幕亚洲不卡 | 欧美黑人巨大xxx极品 | 一本色道久久88加勒比—综合 | 久久av一区| 欧美偷拍少妇精品一区 | 日韩性生交大片免费看 | 午夜香蕉网 | 天天干天天弄 | 三级欧美日韩 | 中国zzji女人高潮免费 | 欧美一区二区三区小说 | 日韩视频福利 | 亚洲最大毛片 | 久久大尺度 | 国产美女喷水 | 中文字幕精 | 亚洲人成无码网站久久99热国产 | 操操久久 | 中文字幕第一区 | 久久久久久a | 激情视频在线观看免费 | 中文字幕高清在线观看 | 成人精品一区二区 | 在线视频日韩欧美 | 亚洲伦理一区二区三区 | 天天射,天天干 | 欧美日韩性 | 看日本黄色录像 | 超碰超碰超碰超碰超碰 | 九色porny原创自拍 | 成人性生活视频 | com国产| 青青青手机视频在线观看 | 五月婷婷视频在线观看 | av中文字| 色在线看 | 在线免费观看国产视频 | 美女18网站 | 黄色激情视频网站 | 69久久久久 | 91传媒在线免费观看 | 国产视频污 | 极品在线视频 | 亚洲视频图片小说 | 窝窝午夜精品一区二区 | 国偷自产视频一区二区久 | 九九热视频精品在线观看 | 国产婷婷久久 | 四虎影视免费在线观看 | 国产精品久久久久久久久免费桃花 | 91蝌蚪网 | 99这里只有精品 | 成人中文字幕在线 | 91视频免费播放 | 婷婷99| 中国农村一级片 | 国产成人精品免高潮费视频 | 一级黄色免费看 | 五月网婷婷 | 成人特级毛片69免费观看 | 国产午夜精品一区二区三区四区 | 日日夜夜天天 | 午夜影院福利 | 草逼国产| 亚洲乱码国产乱码精品精 | 91精品国产综合久久久久 | av在线视| 欧洲日韩一区二区三区 | 一级特黄特色的免费大片视频 | 亚洲小说专区 | 久久wwww | 精品在线免费观看视频 | 成人www| 亚洲成熟少妇 | 国产在线观看免费视频软件 | 午夜国产视频 | 欧美日韩免费视频 | www.香蕉.com | a级一a一级在线观看 | 日日操夜夜爽 | 亚洲精品激情视频 | 欧美黑人做爰爽爽爽 | 无套内谢少妇高潮免费 | av观看一区| 在线免费一区 | 日韩一区二区三区在线 | 欧美精品久久久久久久久 | 久久国产精品影视 | 亚洲一二三在线 | 放荡闺蜜高h苏桃情事h | 午夜亚洲AV永久无码精品蜜芽 | 日本黄色xxxxx | 91亚洲天堂 | 杨幂一区二区三区免费看视频 | 一出一进一爽一粗一大视频 | 极品少妇一区 |