javascript
2.SpringCloud学习(二)——Spring Cloud Eureka 服务注册中心
1.簡介
1.1 概述
Service Discovery is one of the key tenets of a microservice-based architecture. Trying to hand-configure each client or some form of convention can be difficult to do and can be brittle. Eureka is the Netflix Service Discovery Server and Client. The server can be configured and deployed to be highly available, with each server replicating state about the registered services to the others.
服務發現是基于微服務的體系結構的主要宗旨之一。嘗試手動配置每個客戶端或某種形式的約定可能很困難并且很脆弱。 Eureka是Netflix Service Discovery服務器和客戶端。可以將服務器配置和部署為高度可用,每個服務器將有關已注冊服務的狀態復制到其他服務器。
1.2 特點
The Eureka server does not have a back end store, but the service instances in the registry all have to send heartbeats to keep their registrations up to date (so this can be done in memory). Clients also have an in-memory cache of Eureka registrations (so they do not have to go to the registry for every request to a service).
By default, every Eureka server is also a Eureka client and requires (at least one) service URL to locate a peer. If you do not provide it, the service runs and works, but it fills your logs with a lot of noise about not being able to register with the peer.
Eureka服務器沒有后端存儲,但是注冊表中的所有服務實例都必須發送心跳信號以使其注冊保持最新(這樣可以在內存中完成)。客戶端還具有Eureka注冊的內存緩存(因此,不用每個請求都轉發到注冊中心)。
默認情況下,每個Eureka服務器也是Eureka客戶端,并且需要(至少一個)服務URL來定位對等方。如果您不提供該服務,則該服務將運行并工作,但是它將使您的日志充滿無法注冊到對等方的噪音。
2.演示環境
3.演示代碼
- nfx-eureka-client: eureka 客戶端,注冊到 eureka 服務端:
- user-api: 公共api,定義實體和接口;
- user-service-provider: 服務提供方,注冊到 eureka server
- user-service-consumer: 服務調用方,注冊到 eureka server
- nfx-eureka-server: eureka 服務端,負責提供服務注冊及發現功能。
3.1 nfx-eureka-server
3.1.1 代碼說明
eureka 服務端,供客戶端進行注冊,同時提供服務發現功能。
3.1.2 maven 依賴
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency> </dependencies>3.1.3 配置文件
spring.application.name=nfx-eureka-server # 端口號 server.port=9090 # 服務注冊中心主機名 eureka.instance.hostname=localhost # 是否注冊自己 eureka.client.register-with-eureka=false # 是否檢索服務 eureka.client.fetch-registry=false # eureka server 地址 eureka.client.service-url.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka/3.1.4 java代碼
NetflixEurekaServerApplication.java
// 通過 @EnableEurekaServer 聲明為 eureka 服務端 @EnableEurekaServer @SpringBootApplication public class NetflixEurekaServerApplication {public static void main(String[] args) {SpringApplication.run(NetflixEurekaServerApplication.class, args);} }3.2 nfx-eureka-client
3.2.1 user-api
3.2.1.1 代碼說明
公共模型和接口定義
3.2.1.2 java代碼
UserModel.java
public class UserModel {private Long id;private String name;private Integer age;private String birthday;private String address;private String phone;public UserModel() {}public UserModel(Long id, String name, Integer age, String birthday, String address, String phone) {this.id = id;this.name = name;this.age = age;this.birthday = birthday;this.address = address;this.phone = phone;}// get&set&toString }UserService.java
public interface UserService {List<UserModel> findAll();UserModel findById(Long id);UserModel add(UserModel userModel);UserModel update(UserModel userModel);UserModel deleteById(Long id); }3.2.2 user-service-provider
3.2.1 代碼說明
服務提供者,依賴 user-api,實現其中的接口;注冊到 eureka server
3.2.2 maven 依賴
<dependencies><dependency><groupId>com.soulballad.usage</groupId><artifactId>user-api</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> </dependencies>3.2.3 配置文件
spring.application.name=eureka-client-provider server.port=8090 eureka.server.host=localhost eureka.server.port=9090 eureka.client.service-url.defaultZone=http://${eureka.server.host}:${eureka.server.port}/eureka/3.2.4 java代碼
UserRepository.java
@Repository public class UserRepository {// 預置兩條數據,所以起始值從2開始private static final AtomicLong ID_GENERATOR = new AtomicLong(2);// 模擬數據庫操作private static final Map<Long, UserModel> USER_MAP = new HashMap<>();@PostConstructpublic void init() {UserModel user1 = new UserModel(1L, "zhangsan", 20, "2000-01-02", "beijing", "13666666666");UserModel user2 = new UserModel(2L, "lisi", 30, "1990-03-23", "shanghai", "13888888888");USER_MAP.put(user1.getId(), user1);USER_MAP.put(user2.getId(), user2);}public List<UserModel> findAll() {return new ArrayList<>(USER_MAP.values());}public UserModel findById(Long id) {return USER_MAP.containsKey(id) ? USER_MAP.get(id) : new UserModel();}public UserModel add(UserModel userModel) {long id = ID_GENERATOR.incrementAndGet();userModel.setId(id);USER_MAP.put(id, userModel);return userModel;}public UserModel update(UserModel userModel) {USER_MAP.put(userModel.getId(), userModel);return USER_MAP.get(userModel.getId());}public UserModel deleteById(Long id) {UserModel userModel = USER_MAP.get(id);USER_MAP.remove(id);return userModel;} }UserServiceImpl.java
@Service public class UserServiceImpl implements UserService {@Autowiredprivate UserRepository userRepository;@Overridepublic List<UserModel> findAll() {return userRepository.findAll();}@Overridepublic UserModel findById(Long id) {return userRepository.findById(id);}@Overridepublic UserModel add(UserModel userModel) {return userRepository.add(userModel);}@Overridepublic UserModel update(UserModel userModel) {return userRepository.update(userModel);}@Overridepublic UserModel deleteById(Long id) {return userRepository.deleteById(id);} }UserProviderController.java
@RestController @RequestMapping(value = "/provider/user") public class UserProviderController {@Autowiredprivate UserService userService;@GetMapping(value = "/list")public List<UserModel> list() {return userService.findAll();}@GetMapping(value = "/query/{id}")public UserModel query(@PathVariable Long id) {return userService.findById(id);}@PostMapping(value = "/add")public UserModel add(@RequestBody UserModel userModel) {return userService.add(userModel);}@PutMapping(value = "/update")public UserModel update(@RequestBody UserModel userModel) {return userService.update(userModel);}@DeleteMapping(value = "/delete/{id}")public UserModel deleteById(@PathVariable Long id) {return userService.deleteById(id);} }3.2.3 user-service-consumer
3.2.3.1 代碼說明
服務提供者,依賴 user-api,調用其中的接口;注冊到 eureka server
3.2.2 maven 依賴
<dependencies><dependency><groupId>com.soulballad.usage</groupId><artifactId>user-api</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> </dependencies>3.2.3 配置文件
spring.application.name=eureka-client-consumer server.port=8080 eureka.server.host=localhost eureka.server.port=9090 eureka.client.service-url.defaultZone=http://${eureka.server.host}:${eureka.server.port}/eureka/3.2.4 java代碼
UserServiceProxy.java
@Service public class UserServiceProxy implements UserService {// user-service-provider 的 application.properties 中 spring.application.name + prefixprivate static final String USER_PROVIDER_PREFIX = "http://eureka-client-provider" + "/provider/user";// 在 UserServiceConsumerApplication 進行聲明@Autowiredprivate RestTemplate restTemplate;@Overridepublic List<UserModel> findAll() {UserModel[] userArray = restTemplate.getForObject(USER_PROVIDER_PREFIX + "/list", UserModel[].class);return Arrays.asList(userArray != null ? userArray : new UserModel[0]);}@Overridepublic UserModel findById(Long id) {return restTemplate.getForObject(USER_PROVIDER_PREFIX + "/query/{id}", UserModel.class, id);}@Overridepublic UserModel add(UserModel userModel) {return restTemplate.postForObject(USER_PROVIDER_PREFIX + "/add", userModel, UserModel.class);}@Overridepublic UserModel update(UserModel userModel) {restTemplate.put(USER_PROVIDER_PREFIX + "/update", userModel);return findById(userModel.getId());}@Overridepublic UserModel deleteById(Long id) {UserModel userModel = findById(id);restTemplate.delete(USER_PROVIDER_PREFIX + "/delete/{id}", id);return userModel;} }UserConsumerController.java
@RestController @RequestMapping(value = "/consumer/user") public class UserConsumerController {@Autowiredprivate UserService userService;@GetMapping(value = "/list")public List<UserModel> list() {return userService.findAll();}@GetMapping(value = "/query/{id}")public UserModel query(@PathVariable Long id) {return userService.findById(id);}@PostMapping(value = "/add")public UserModel add(@RequestBody UserModel userModel) {return userService.add(userModel);}@PutMapping(value = "/update")public UserModel update(@RequestBody UserModel userModel) {return userService.update(userModel);}@DeleteMapping(value = "/delete/{id}")public UserModel deleteById(@PathVariable Long id) {return userService.deleteById(id);} }UserServiceConsumerApplication.java
@EnableDiscoveryClient @SpringBootApplication public class UserServiceConsumerApplication {public static void main(String[] args) {SpringApplication.run(UserServiceConsumerApplication.class, args);}// 負載均衡@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();} }3.3 git 地址
spring-cloud-nfx-02-eureka: Spring Cloud 整合 Eureka 實現的分布式注冊中心方案
4.效果展示
4.1 nfx-eureka-server
啟動 eureka 服務端 nfx-eureka-server,訪問 http://localhost:9090, 可以看到如下頁面
沒有任何服務注冊到 nfx-eureka-server 上面來。
4.2 user-service-provider
然后再啟動 user-service-provider,再次訪問 http://localhost:9090,可以看到服務提供者已經注冊上來了
在 netflix-eureka-client-provider 訪問下列地址,觀察輸出信息是否符合預期。
查詢用戶列表
### GET /provider/user/list GET http://localhost:8090/provider/user/list Accept: application/json根據id查詢用戶
### GET /provider/user/query/{id} GET http://localhost:8090/provider/user/query/1 Accept: application/json新增用戶
### POST /provider/user/add POST http://localhost:8090/provider/user/add Accept: application/json Content-Type: application/json{"name": "wangwu","age": 20,"birthday": "2000-01-01","address": "wuhan","phone": "15999999999" }更新用戶
### PUT /provider/user/update PUT http://localhost:8090/provider/user/update Accept: application/json Content-Type: application/json{"id": 2,"name": "lisi","age": 40,"birthday": "1980-01-01","address": "guangzhou","phone": "13888888888" }根據id刪除用戶
### DELETE /provider/user/delete/{id} DELETE http://localhost:8090/provider/user/delete/3 Accept: application/json可以看到 user-service-provider 提供的接口都可以正常運行
4.3 user-service-consumer
然后再啟動 user-service-consumer,再次訪問 http://localhost:9090,可以看到服務提供者已經注冊上來了
在 netflix-eureka-client-consumer 訪問下列地址,觀察輸出信息是否符合預期。
查詢用戶列表
### GET /consumer/user/list GET http://localhost:8080/consumer/user/list Accept: application/json根據id查詢用戶
### GET /consumer/user/query/{id} GET http://localhost:8080/consumer/user/query/1 Accept: application/json新增用戶
### POST /consumer/user/add POST http://localhost:8080/consumer/user/add Accept: application/json Content-Type: application/json{"name": "wangwu","age": 20,"birthday": "2000-01-01","address": "wuhan","phone": "15999999999" }更新用戶
### PUT /consumer/user/update PUT http://localhost:8080/consumer/user/update Accept: application/json Content-Type: application/json{"id": 2,"name": "lisi","age": 40,"birthday": "1980-01-01","address": "shanghang-pudong","phone": "13888888888" }根據id刪除用戶
### DELETE /consumer/user/delete/{id} DELETE http://localhost:8080/consumer/user/delete/4 Accept: application/json5.源碼分析
5.1 EurekaServer 如何啟動?
在使用 @EnableEurekaServer 時,激活了 EurekaServerMarkerConfiguration 配置類,在 EurekaServer 的自動裝配類 EurekaServerAutoConfiguration 中,通過構造函數聲明了 EurekaController
@Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",matchIfMissing = true) public EurekaController eurekaController() {return new EurekaController(this.applicationInfoManager); }這個 EurekaController 實際上就是訪問 http://localhost:9090/ 時對應的 eureka 管理后臺,它使用spring-mvc 來進行實現。
@Controller @RequestMapping("${eureka.dashboard.path:/}") public class EurekaController {@Value("${eureka.dashboard.path:/}")private String dashboardPath = "";private ApplicationInfoManager applicationInfoManager;public EurekaController(ApplicationInfoManager applicationInfoManager) {this.applicationInfoManager = applicationInfoManager;}@RequestMapping(method = RequestMethod.GET)public String status(HttpServletRequest request, Map<String, Object> model) {populateBase(request, model);populateApps(model);StatusInfo statusInfo;try {statusInfo = new StatusResource().getStatusInfo();}catch (Exception e) {statusInfo = StatusInfo.Builder.newBuilder().isHealthy(false).build();}model.put("statusInfo", statusInfo);populateInstanceInfo(model, statusInfo);filterReplicas(model, statusInfo);return "eureka/status";} }可以看到 spring-cloud-starter-netflix-eureka-server 的依賴關系如下
5.2 EurekaClient如何注冊?
服務啟動的時候,會在刷新上下文的時候啟動 Lifecycle,EurekaAutoServiceRegistration 是 Lifecycle 的一個實現類,所以會調用它的 start 方法,在 start 方法中通過調用 serviceRegistry.register 方法來進行注冊。
這里的 serviceRegistry 是 EurekaServiceRegistry,EurekaServiceRegistry 實現了 ServiceRegistry 接口,ServiceRegistry 在 spring-cloud-common 中進行定義,它是一個通用的接口,根據實現方案的不同,它還可以是 ConsulServiceRegistry、NacosServiceRegistry、ZookeeperServiceRegistry 等。
EurekaServiceRegistry#register
@Override public void register(EurekaRegistration reg) {maybeInitializeClient(reg);if (log.isInfoEnabled()) {log.info("Registering application "+ reg.getApplicationInfoManager().getInfo().getAppName()+ " with eureka with status "+ reg.getInstanceConfig().getInitialStatus());}// 設置實例狀態reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());// 健康檢查reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler)); }在 setInstanceStatus 中調用 listener.notify 進行通知
public synchronized void setInstanceStatus(InstanceStatus status) {InstanceStatus next = instanceStatusMapper.map(status);if (next == null) {return;}InstanceStatus prev = instanceInfo.setStatus(next);if (prev != null) {for (StatusChangeListener listener : listeners.values()) {try {listener.notify(new StatusChangeEvent(prev, next));} catch (Exception e) {logger.warn("failed to notify listener: {}", listener.getId(), e);}}} }這里的 listeners 通過 registerStatusChangeListener 方法進行注冊
public void registerStatusChangeListener(StatusChangeListener listener) {listeners.put(listener.getId(), listener); }這個方法在 DiscoveryClient 中進行調用
private ApplicationInfoManager.StatusChangeListener statusChangeListener; statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();} };if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener); }這里的 listener 為 ApplicationInfoManager.StatusChangeListener,所以調用到它的 notify 方法;然后調用到
instanceInfoReplicator.onDemandUpdate()
public boolean onDemandUpdate() {if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {if (!scheduler.isShutdown()) {// 開啟一個任務scheduler.submit(new Runnable() {@Overridepublic void run() {logger.debug("Executing on-demand update of local InstanceInfo");Future latestPeriodic = scheduledPeriodicRef.get();if (latestPeriodic != null && !latestPeriodic.isDone()) {logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");latestPeriodic.cancel(false);}InstanceInfoReplicator.this.run();}});return true;} else {logger.warn("Ignoring onDemand update due to stopped scheduler");return false;}} else {logger.warn("Ignoring onDemand update due to rate limiter");return false;} }最終調用到 InstanceInfoReplicator.this.run(),通過 discoveryClient.register 進行注冊
public void run() {try {// 刷新實例信息discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 注冊discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);} } boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }這里使用 jersey 進行了 http 調用,發送 post 請求
@Override public EurekaHttpResponse<Void> register(InstanceInfo info) {// 請求路徑為 apps/EUREKA-CLIENT-PROVIDER 或 apps/EUREKA-CLIENT-CONSUMERString urlPath = "apps/" + info.getAppName();ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);// Post 請求,媒體類型是 jsonresponse = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}} }5.3 EurekaServer處理注冊請求
client 端發送請求后到達 ApplicationResource#addInstance
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// validate that the instanceinfo contains all the necessary required fields// 參數校驗if (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}// 注冊registry.register(info, "true".equals(isReplication));return Response.status(204).build(); // 204 to be backwards compatible }這里的 registry 是 PeerAwareInstanceRegistry,它的類圖如下
最終調用的 register 方法在 AbstractInstanceRegistry 中
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {read.lock();// 從registry中獲取當前app的實例信息mapMap<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());// 增加注冊次數REGISTER.increment(isReplication);// 如果是第一次注冊,初始化一個ConcurrentHashMapif (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}// 從gMap中獲取存在的Lease信息Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}// 構建一個LeaseLease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}gMap.put(registrant.getId(), lease);synchronized (recentRegisteredQueue) {recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));}// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 得到Lease實例,判斷狀態是否為UPif (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}// 設置注冊類型為新增registrant.setActionType(ActionType.ADDED);recentlyChangedQueue.add(new RecentlyChangedItem(lease));registrant.setLastUpdatedTimestamp();// 緩存過期invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);} finally {read.unlock();} }在 register 完成之后,有一個 replicateToPeers() 方法,它用來實現集群節點之間信息復制
private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {Stopwatch tracer = action.getTimer().start();try {if (isReplication) {numberOfReplicationsLastMin.increment();}// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {return;}// 獲取到所有的nodefor (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {continue;}// 復制實例信息到每個nodereplicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);}} finally {tracer.stop();} }replicateInstanceActionsToPeers 實現如下
private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus,PeerEurekaNode node) {try {InstanceInfo infoFromRegistry = null;CurrentRequestVersion.set(Version.V2);// 判斷操作類型switch (action) { case Cancel:// 取消注冊node.cancel(appName, id);break;case Heartbeat:// 心跳InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);break;case Register:// 注冊node.register(info);break;case StatusUpdate:// 狀態變更infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;case DeleteStatusOverride:// 刪除被重寫的實例infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.deleteStatusOverride(appName, id, infoFromRegistry);break;}} catch (Throwable t) {logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);} }6.參考
總結
以上是生活随笔為你收集整理的2.SpringCloud学习(二)——Spring Cloud Eureka 服务注册中心的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 自学3D游戏建模有哪些教材?自学难不难?
- 下一篇: NodeJS必知基础知识(非巨详细)