Eureka入口之DiscoveryClient
首先我們用EurekaClient 就必須用到圖上的兩個注解之一。這兩個注解有什么關系呢?
兩個注解
這邊很明顯的 EnableEurekaClient 注解里面包含了EnableDiscoveryClient注解
其他的注解和EnableDiscoveryClient 中的一樣, 這樣我是否可以理解為只要看EnableDiscoveryClient 就行了。
這里網上看過上面兩個注解在使用eureka 作為注冊中心是沒什么區別的,EnableEurekaClient只能用于eureka。
CAP定理,eureka的優點,與dubbo 之間的一些差異
EnableDiscoveryClient 可以用eureka(ap),zk(cp),consoul(ca)等等。zk 具有強一致性, 但是當master死掉的時候需要等重新選舉出一個master 才能繼續工作。eureka具有高可用性, 每一個eurekaserver 都可以單獨使用的,他們之間是靠copy replicate 來進行數據更新的,當一個server掛掉之后,我們可以直接從別的server拿取信息。這里我想寫一下dubbo 使用zk和 sb 集成eureka的心跳區別。dubbo 是cp之間的心跳具體我們看源碼中netty中的url 指向的是provoid ,然后有個心跳。eureka 是c 和s 之間的心跳, 好像是在續租那塊。
dubbo的服務在注冊中心死亡后還是能夠調用,這是因為dubbo 會將zk 上的信息存一份在本地, 調用的時候也是調用的本地的,當有一個新服務注冊到zk上的時候,此服務先從zk 上拉出各個client 的信息, 而zk會將此服務的接口推送給其他已經注冊到zk上的服務。注冊中心掛掉還能調用的解析
為什么會加載@EnableDiscoveryClient到ioc 中呢?具體網上搜索spring.factories 文件的作用
一般看注解的會執行那個類 我們只需要忽略掉Enable 就行。
源碼解析
所以我們crtl+shift+t搜索 DiscoveryClient 。這時會搜索到兩個 一個是類一個是接口, 我們選擇哪個呢? 看兩個類的報名發現接口類的包名和注解是一樣的,所以我們選擇接口。
crtl+T我們看到下圖所示:
我們看的是Eureka啊 , 直接無腦點EurekaDiscoveryClient, 進入里面我們會看到可愛的EurekaClient 作為EurekaDiscoveryClient的一個屬性,這個時候crtl+t 會來到DiscoveryClient 這個類中, 注意不要和上面同名的接口搞混。當DiscoveryClient被new出來的時候,找到Discovery的構造方法,我打的斷點里面 會首先走到下圖的代碼大概是DiscoveryClient的第371行左右:
// default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); // use direct handoffcacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()); // use direct handoff首先這邊會新建個調度器核心線程數為2,后面的參數為構建模式建立。后面新建了兩個Executor執行器。顧名思義一個心跳,一個清緩存。后面的很多方法我沒有去看,最主要的是這個方法initScheduledTasks() (code6);
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}registryFetchIntervalSeconds這個是每過多長時間去EurekaServer 上去拿實例 ,expBackOffBound為緩存刷新重試延遲時間的最大乘數值,具體怎么使用我們進入TimedSupervisorTask,這個類的初始化參數的意思 我們可以將鼠標放置上面查看:
當我們進入之后(code7):
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {this.scheduler = scheduler;this.executor = executor;this.timeoutMillis = timeUnit.toMillis(timeout);this.task = task;this.delay = new AtomicLong(timeoutMillis);this.maxDelay = timeoutMillis * expBackOffBound;// Initialize the counters and register.timeoutCounter = Monitors.newCounter("timeouts");rejectedCounter = Monitors.newCounter("rejectedExecutions");throwableCounter = Monitors.newCounter("throwables");threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());Monitors.registerObject(name, this);}maxDelay是10*毫秒, 還有Monitors 來監控超時,異常等的數量,delay 是AtomicLong 說明這個是原子的,在同一個時間只能有一個線程對他進行操作。我們再看看他的run()方法就特別的有意思:
public void run() {Future future = null;try {future = executor.submit(task);threadPoolLevelGauge.set((long) executor.getActiveCount());future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeoutdelay.set(timeoutMillis);threadPoolLevelGauge.set((long) executor.getActiveCount());} catch (TimeoutException e) {logger.error("task supervisor timed out", e);timeoutCounter.increment();long currentDelay = delay.get();long newDelay = Math.min(maxDelay, currentDelay * 2);delay.compareAndSet(currentDelay, newDelay);} catch (RejectedExecutionException e) {if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn("task supervisor shutting down, reject the task", e);} else {logger.error("task supervisor rejected the task", e);}rejectedCounter.increment();} catch (Throwable e) {if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn("task supervisor shutting down, can't accept the task");} else {logger.error("task supervisor threw an exception", e);}throwableCounter.increment();} finally {if (future != null) {future.cancel(true);}if (!scheduler.isShutdown()) {scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);}}}future 說明是異步的執行任務,如果出現異常我們就會加大延遲時間 最大不會草果maxDelay,并且Counter類型會+1,最有意思的是這個finally 他繼續的調用 調度器,使得調度器能夠不停的執行下去。
getDiscoveryServiceUrls()這個方法,繼續走會調到EndpointUtil中的getServiceUrlsFromConfig()方法,進入之后(code1)
ist<String> orderedUrls = new ArrayList<String>();String region = getRegion(clientConfig);String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());if (availZones == null || availZones.length == 0) {availZones = new String[1];availZones[0] = DEFAULT_ZONE;}?
首先 我們看到getRegion()方法,clientConfig他是根據application.yml 配置來獲取配置信息的如果我們配置了就取出來, 如果沒有我們就返回一個默認值具體看如下代碼(code2):
public static String getRegion(EurekaClientConfig clientConfig) {String region = clientConfig.getRegion();if (region == null) {region = DEFAULT_REGION;}region = region.trim().toLowerCase();return region;}我們在看看code1代碼塊中的這句話:
?String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
點進去(code3):
public String[] getAvailabilityZones(String region) {String value = this.availabilityZones.get(region);if (value == null) {value = DEFAULT_ZONE;}return value.split(",");}@Overridepublic List<String> getEurekaServerServiceUrls(String myZone) {String serviceUrls = this.serviceUrl.get(myZone);if (serviceUrls == null || serviceUrls.isEmpty()) {serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);}if (!StringUtils.isEmpty(serviceUrls)) {final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);for (String eurekaServiceUrl : serviceUrlsSplit) {if (!endsWithSlash(eurekaServiceUrl)) {eurekaServiceUrl += "/";}eurekaServiceUrls.add(eurekaServiceUrl);}return eurekaServiceUrls;}return new ArrayList<>();}我們看到如果配置文件里面不配置的話,會返回值為DEFAULT_ZONE,如果賦值了, 這邊會用,分割成一個數組, 說明我們可以配置多個地址并且用逗號分割。如果返回Default_zone的時候 serviceUrls會根據這個key 取值 我找到了 這一串代碼(code4)
private Map<String, String> serviceUrl = new HashMap<>();{this.serviceUrl.put(DEFAULT_ZONE, DEFAULT_URL);}所以這個Default_url是什么呢?我們繼續跟蹤得到(code5);
public static final String DEFAULT_URL = "http://localhost:8761" + DEFAULT_PREFIX+ "/";public static final String DEFAULT_PREFIX = "/eureka";這邊也就很好的解釋了。為什么不去配置 eurekaClient 為什么能夠連接本地的8761端口的EurekaServer.(先去跑步有空再寫)
總結
以上是生活随笔為你收集整理的Eureka入口之DiscoveryClient的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SAP中的client
- 下一篇: NRF24L01的学习浅析(通过软件模拟