日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud Eureka源码分析之心跳续约及自我保护机制

發布時間:2023/12/20 javascript 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud Eureka源码分析之心跳续约及自我保护机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Eureka-Server是如何判斷一個服務不可用的?

Eureka是通過心跳續約的方式來檢查各個服務提供者的健康狀態。

實際上,在判斷服務不可用這個部分,會分為兩塊邏輯。

  • Eureka-Server需要定期檢查服務提供者的健康狀態。
  • Eureka-Client在運行過程中需要定期更新注冊信息。
  • Eureka的心跳續約機制如下圖所示。

  • 客戶端在啟動時, 會開啟一個心跳任務,每隔30s向服務單發送一次心跳請求。
  • 服務端維護了每個實例的最后一次心跳時間,客戶端發送心跳包過來后,會更新這個心跳時間。
  • 服務端在啟動時,開啟了一個定時任務,該任務每隔60s執行一次,檢查每個實例的最后一次心跳時間是否超過90s,如果超過則認為過期,需要剔除。
  • 關于上述流程中涉及到的時間,可以通過以下配置來更改.

    #Server 至上一次收到 Client 的心跳之后,等待下一次心跳的超時時間,在這個時間內若沒收到下一次心跳,則將移除該 Instance。 eureka.instance.lease-expiration-duration-in-seconds=90 # Server 清理無效節點的時間間隔,默認60000毫秒,即60秒。 eureka.server.eviction-interval-timer-in-ms=60

    客戶端心跳發起流程

    心跳續約是客戶端發起的,每隔30s執行一次。

    DiscoveryClient.initScheduledTasks

    繼續回到
    DiscoveryClient.initScheduledTasks?方法中,

    private void initScheduledTasks() {//省略....heartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread());scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);//省略.... }

    renewalIntervalInSecs=30s, 默認每隔30s執行一次。

    HeartbeatThread

    這個線程的實現很簡單,調用?renew()?續約,如果續約成功,則更新最后一次心跳續約時間。

    private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}} }

    在?renew()?方法中,調用EurekaServer的?"apps/" + appName + '/' + id;?這個地址,進行心跳續約。

    boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;} }

    服務端收到心跳處理

    服務端具體為調用[
    com.netflix.eureka.resources]包下的InstanceResource類的renewLease方法進行續約,代碼如下

    @PUT public Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);//調用renew進行續約boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);// Not found in the registry, immediately ask for a registerif (!isSuccess) { //如果續約失敗,返回異常logger.warn("Not Found (Renew): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponse response;//校驗客戶端與服務端的時間差異,如果存在問題則需要重新發起注冊if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);// Store the overridden status since the validation found out the node that replicates winsif (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()&& (overriddenStatus != null)&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))&& isFromReplicaNode) {registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));}} else {response = Response.ok().build(); // 續約成功,返回200}logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());return response; }

    InstanceRegistry.renew

    renew的實現方法如下,主要有兩個流程

  • 從服務注冊列表中找到匹配當前請求的實例
  • 發布EurekaInstanceRenewedEvent事件
  • @Override public boolean renew(final String appName, final String serverId,boolean isReplication) {log("renew " + appName + " serverId " + serverId + ", isReplication {}"+ isReplication);//獲取所有服務注冊信息List<Application> applications = getSortedApplications();for (Application input : applications) { //逐一遍歷if (input.getName().equals(appName)) { //如果當前續約的客戶端和某個服務注冊信息節點相同InstanceInfo instance = null;for (InstanceInfo info : input.getInstances()) { //遍歷這個服務集群下的所有節點,找到某個匹配的實例instance返回。if (info.getId().equals(serverId)) {instance = info; //break;}}//發布EurekaInstanceRenewedEvent事件,這個事件在EurekaServer中并沒有處理,我們可以監聽這個事件來做一些事情,比如做監控。publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,instance, isReplication));break;}}return super.renew(appName, serverId, isReplication); }

    super.renew

    public boolean renew(final String appName, final String id, final boolean isReplication) {if (super.renew(appName, id, isReplication)) { //調用父類的續約方法,如果續約成功replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); //同步給集群中的所有節點return true;}return false; }

    AbstractInstanceRegistry.renew

    在這個方法中,會拿到應用對應的實例列表,然后調用Lease.renew()去進行心跳續約。

    public boolean renew(String appName, String id, boolean isReplication) {RENEW.increment(isReplication);Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); //根據服務名字獲取實例信息Lease<InstanceInfo> leaseToRenew = null;if (gMap != null) { leaseToRenew = gMap.get(id); //獲取需要續約的服務實例,}if (leaseToRenew == null) { //如果為空,說明這個服務實例不存在,直接返回續約失敗RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);return false;} else { //表示實例存在InstanceInfo instanceInfo = leaseToRenew.getHolder(); //獲取實例的基本信息if (instanceInfo != null) { //實例基本信息不為空// touchASGCache(instanceInfo.getASGName());//獲取實例的運行狀態InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { //如果運行狀態未知,也返回續約失敗logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);return false;}//如果當前請求的實例信息if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),overriddenInstanceStatus.name(),instanceInfo.getId());instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}//更新上一分鐘的續約數量renewsLastMin.increment();leaseToRenew.renew(); //續約return true;} }

    續約的實現,就是更新服務端最后一次收到心跳請求的時間。

    public void renew() {lastUpdateTimestamp = System.currentTimeMillis() + duration;}

    Eureka的自我保護機制

    實際,心跳檢測機制有一定的不確定性,比如服務提供者可能是正常的,但是由于網絡通信的問題,導致在90s內沒有收到心跳請求,那將會導致健康的服務被誤殺。

    為了避免這種問題,Eureka提供了一種叫?自我保護?機制的東西。簡單來說,就是開啟自我保護機制后,Eureka Server會包這些服務實例保護起來,避免過期導致實例被剔除的問題,從而保證Eurreka集群更加健壯和穩定。

    進入自我保護狀態后,會出現以下幾種情況

    • Eureka Server不再從注冊列表中移除因為長時間沒有收到心跳而應該剔除的過期服務,如果在保護期內如果服務剛好這個服務提供者非正常下線了,此時服務消費者就會拿到一個無效的服務實例,此時會調用失敗,對于這個問題需要服務消費者端要有一些容錯機制,如重試,斷路器等!
    • Eureka Server仍然能夠接受新服務的注冊和查詢請求,但是不會被同步到其他節點上,保證當前節點依然可用。

    Eureka自我保護機制,通過配置
    eureka.server.enable-self-preservation?來【?true?】打開/【?false?禁用】自我保護機制,默認打開狀態,建議生產環境打開此配置。

    自我保護機制應該如何設計,才能更加精準地控制到?“是網絡異?!?導致的通信延遲,而不是服務宕機呢?

    Eureka是這么做的: 如果低于85%的客戶端節點都沒有正常的心跳,那么Eureka Server就認為客戶端與注冊中心出現了網絡故障,Eureka Server自動進入自我保護狀態 .

    其中,?85%?這個閾值,可以通過下面這個配置來設置

    # 自我保護續約百分比,默認是0.85 eureka.server.renewal-percent-threshold=0.85

    但是還有個問題,超過誰的85%呢?這里有一個預期的續約數量,這個數量計算公式如下:

    //自我保護閥值 = 服務總數 * 每分鐘續約數(60S/客戶端續約間隔) * 自我保護續約百分比閥值因子

    假設如果有?100?個服務,續約間隔是?30S?,自我保護閾值?0.85?,那么它的預期續約數量為:

    自我保護閾值 =100 * 60 / 30 * 0.85 = 170。

    自動續約的閾值設置

    在EurekaServerBootstrap這個類的?contextInitialized?方法中,會調用?initEurekaServerContext?進行初始化

    public void contextInitialized(ServletContext context) {try {initEurekaEnvironment();initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);}catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);} }

    繼續往下看。

    protected void initEurekaServerContext() throws Exception {EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();//...registry.openForTraffic(applicationInfoManager, registryCount); }

    在openForTraffic方法中,會初始化
    expectedNumberOfClientsSendingRenews?這個值,這個值的含義是:?預期每分鐘收到續約的客戶端數量,取決于注冊到eureka server上的服務數量

    @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// Renewals happen every 30 seconds and for a minute it should be a factor of 2.this.expectedNumberOfClientsSendingRenews = count; //初始值是1.updateRenewsPerMinThreshold();logger.info("Got {} instances from neighboring DS node", count);logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");applicationInfoManager.setInstanceStatus(InstanceStatus.UP);super.postInit(); }

    updateRenewsPerMinThreshold

    接著調用
    updateRenewsPerMinThreshold?方法,會更新一個每分鐘最小的續約數量,也就是Eureka Server期望每分鐘收到客戶端實例續約的總數的閾值。如果小于這個閾值,就會觸發自我保護機制。

    protected void updateRenewsPerMinThreshold() {this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())* serverConfig.getRenewalPercentThreshold()); } //自我保護閥值 = 服務總數 * 每分鐘續約數(60S/客戶端續約間隔) * 自我保護續約百分比閥值因子
    • getExpectedClientRenewalIntervalSeconds,客戶端的續約間隔,默認為30s
    • getRenewalPercentThreshold,自我保護續約百分比閾值因子,默認0.85。 也就是說每分鐘的續約數量要大于85%

    預期值的變化觸發機制


    expectedNumberOfClientsSendingRenews?和
    numberOfRenewsPerMinThreshold?這兩個值,會隨著新增服務注冊以及服務下線的觸發而發生變化。

    PeerAwareInstanceRegistryImpl.cancel

    當服務提供者主動下線時,表示這個時候Eureka-Server要剔除這個服務提供者的地址,同時也代表這這個心跳續約的閾值要發生變化。所以在
    PeerAwareInstanceRegistryImpl.cancel?中可以看到數據的更新

    調用路徑
    PeerAwareInstanceRegistryImpl.cancel ->?AbstractInstanceRegistry.cancel->internalCancel

    服務下線之后,意味著需要發送續約的客戶端數量遞減了,所以在這里進行修改

    protected boolean internalCancel(String appName, String id, boolean isReplication) {//....synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to cancel it, reduce the number of clients to send renews.this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;updateRenewsPerMinThreshold();}} }

    PeerAwareInstanceRegistryImpl.register

    當有新的服務提供者注冊到eureka-server上時,需要增加續約的客戶端數量,所以在register方法中會進行處理

    register ->super.register(AbstractInstanceRegistry) public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {//.... // The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}} }

    每隔15分鐘刷新自我保護閾值

    PeerAwareInstanceRegistryImpl.scheduleRenewalThresholdUpdateTask

    每隔15分鐘,更新一次自我保護閾值!

    private void updateRenewalThreshold() {try {// 1. 計算應用實例數Applications apps = eurekaClient.getApplications();int count = 0;for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {if (this.isRegisterable(instance)) {++count;}}}synchronized (lock) {// Update threshold only if the threshold is greater than the// current expected threshold or if self preservation is disabled.//當節點數量count大于最小續約數量時,或者沒有開啟自我保護機制的情況下,重新計算expectedNumberOfClientsSendingRenews和numberOfRenewsPerMinThresholdif ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)|| (!this.isSelfPreservationModeEnabled())) {this.expectedNumberOfClientsSendingRenews = count;updateRenewsPerMinThreshold();}}logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);} catch (Throwable e) {logger.error("Cannot update renewal threshold", e);} }

    自我保護機制的觸發

    在?AbstractInstanceRegistry?的?postInit?方法中,會開啟一個?EvictionTask?的任務,這個任務用來檢測是否需要開啟自我保護機制。

    這個方法也是在EurekaServerBootstrap方法啟動時觸發。

    protected void postInit() {renewsLastMin.start(); //開啟一個定時任務,用來實現每分鐘的續約數量,每隔60s歸0重新計算if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}evictionTaskRef.set(new EvictionTask()); //啟動一個定時任務EvictionTask,每隔60s執行一次evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs()); }

    其中,EvictionTask的代碼如下。

    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);@Override public void run() {try {//獲取補償時間毫秒數long compensationTimeMs = getCompensationTimeMs();logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);evict(compensationTimeMs);} catch (Throwable e) {logger.error("Could not run the evict task", e);} }

    evict方法

    public void evict(long additionalLeaseMs) {logger.debug("Running the evict task");// 是否需要開啟自我保護機制,如果需要,那么直接RETURE, 不需要繼續往下執行了if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");return;}//這下面主要是做服務自動下線的操作的。 }

    isLeaseExpirationEnabled

    numberOfRenewsPerMinThreshold public boolean isLeaseExpirationEnabled() {if (!isSelfPreservationModeEnabled()) {// The self preservation mode is disabled, hence allowing the instances to expire.return true;}return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; }

    ?

    總結

    以上是生活随笔為你收集整理的Spring Cloud Eureka源码分析之心跳续约及自我保护机制的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。