日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Nacos源码—1.Nacos服务注册发现分析一

發布時間:2025/5/22 编程问答 21 如意码农
生活随笔 收集整理的這篇文章主要介紹了 Nacos源码—1.Nacos服务注册发现分析一 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

大綱

1.客戶端如何發起服務注冊 + 發送服務心跳

2.服務端如何處理客戶端的服務注冊請求

3.注冊服務—如何實現高并發支撐上百萬服務注冊

4.內存注冊表—如何處理注冊表的高并發讀寫沖突

1.客戶端如何發起服務注冊 + 發送服務心跳

(1)Nacos客戶端項目啟動時為什么會自動注冊服務

(2)Nacos客戶端通過什么方式注冊服務

(3)Nacos客戶端如何發送服務心跳

(1)Nacos客戶端項目啟動時為什么會自動注冊服務

Nacos客戶端就是引入了nacos-discovery + nacos-client依賴的項目。引入spring-cloud-starter-alibaba-nacos-discovery后,才自動注冊服務。查看這個依賴包中的spring.factories文件,發現有一些Configuration類。

Spring Boot啟動時會掃描spring.factories文件,然后創建里面的配置類。

在spring.pactories文件中,與注冊相關的類就是:NacosServiceRegistryAutoConfiguration這個Nacos服務注冊自動配置類。

Nacos服務注冊自動配置類NacosServiceRegistryAutoConfiguration如下,該配置類創建了三個Bean。

第一個Bean:NacosServiceRegistry

這個Bean在創建時,會傳入加載了yml配置文件內容的類NacosDiscoveryProperties。

第二個Bean:NacosRegistration

這個Bean在創建時,會傳入加載了yml配置文件內容的類NacosDiscoveryProperties。

第三個Bean:NacosAutoServiceRegistration

這個Bean在創建時,會傳入NacosServiceRegistry和NacosRegistration兩個Bean。然后該Bean繼承了AbstractAutoServiceRegistration抽象類。該抽象類實現了ApplicationListener接口,所以項目啟動時便是利用了Spring的監聽事件來實現自動注冊服務的。因為在Spring容器啟動的最后會執行finishRefresh()方法,然后會發布一個事件,該事件會觸發調用onApplicationEvent()方法。

調用AbstractAutoServiceRegistration的onApplicationEvent()方法時,首先會調用AbstractAutoServiceRegistration的bind()方法,然后調用AbstractAutoServiceRegistration的start()方法,接著調用AbstractAutoServiceRegistration的register()方法發起注冊,也就是調用this.serviceRegistry的register()方法完成服務注冊的具體工作。

其中,AbstractAutoServiceRegistration的serviceRegistry屬性,是在服務注冊自動配置類NacosServiceRegistryAutoConfiguration,創建第三個Bean—NacosAutoServiceRegistration時,通過傳入其創建的第一個Bean—NacosServiceRegistry進行賦值的。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
//傳入NacosDiscoveryProperties作為參數
return new NacosServiceRegistry(nacosDiscoveryProperties);
} @Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
//傳入NacosDiscoveryProperties作為參數
return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
} @Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
//傳入NacosServiceRegistry和NacosRegistration作為參數
return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
}
} @ConfigurationProperties("spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties {
//nacos discovery server address.
private String serverAddr;
//the nacos authentication username.
private String username;
//the nacos authentication password.
private String password;
//namespace, separation registry of different environments.
private String namespace;
//service name to registry.
@Value("${spring.cloud.nacos.discovery.service:${spring.application.name:}}")
private String service;
//cluster name for nacos.
private String clusterName = "DEFAULT";
//group name for nacos.
private String group = "DEFAULT_GROUP";
//The ip address your want to register for your service instance, needn't to set it if the auto detect ip works well.
private String ip;
//The port your want to register for your service instance, needn't to set it if the auto detect port works well.
private int port = -1;
//Heart beat interval. Time unit: millisecond.
private Integer heartBeatInterval;
//Heart beat timeout. Time unit: millisecond.
private Integer heartBeatTimeout;
//If instance is ephemeral.The default value is true.
private boolean ephemeral = true;
...
} public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {
...
private NacosRegistration registration;
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
...
} public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
...
private final ServiceRegistry<R> serviceRegistry;
private AutoServiceRegistrationProperties properties; protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry, AutoServiceRegistrationProperties properties) {
this.serviceRegistry = serviceRegistry;
this.properties = properties;
}
... @Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
} public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
} public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
//only initialize if nonSecurePort is greater than 0 and it isn't already running
//because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
//發起注冊
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
} protected void register() {
//調用創建NacosAutoServiceRegistration時傳入的NacosServiceRegistry實例的register()方法
this.serviceRegistry.register(getRegistration());
}
...
} public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private final NacosDiscoveryProperties nacosDiscoveryProperties; public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
} @Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
} NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration); try {
//把當前的服務實例注冊到Nacos中
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
} catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
//rethrow a RuntimeException if the registration is failed.
//issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
} private NamingService namingService() {
return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
}
...
}

Nacos客戶端項目啟動時自動觸發服務實例注冊的流程總結:Spring監聽器調用onApplicationEvent()方法 -> bind()方法 -> start()方法 -> register()方法,最后register()方法會調用serviceRegistry屬性的register()方法進行注冊。

整個流程具體來說就是:首先通過spring.factories文件,找到一個注冊相關的Configuration配置類,這個配置類里面定義了三個Bean對象。創建第三個Bean對象時,需要第一個、第二個Bean對象作為參數傳進去。第一個Bean對象里面就有真正進行服務注冊的register()方法,并且第一個Bean對象會賦值給第三個Bean對象中的serviceRegistry屬性。在第三個Bean對象的父類會實現Spring的監聽器方法。所以在Spring容器啟動時會發布監聽事件,從而觸發執行Nacos注冊邏輯。

(2)Nacos客戶端通過什么方式注冊服務

項目啟動時是通過NacosServiceRegistry的register()方法發起服務注冊的,然后會調用NacosNamingService的registerInstance()方法注冊服務實例,接著調用NamingProxy的registerService()方法組裝參數發起服務注冊請求,接著調用NamingProxy的reqApi()方法向Nacos服務端發起服務注冊請求,也就是調用NamingProxy的callServer()方法向Nacos服務端發送注冊請求。

在NamingProxy的callServer()方法中,首先會調用NacosRestTemplate的exchangeForm()方法發起HTTP請求,然后會調用this.requestClient()的execute()方法執行HTTP請求的發送,接著會調用DefaultHttpClientRequest的execute()方法處理請求的發送,也就是通過Apache的CloseableHttpClient組件來處理發送HTTP請求。

注意:NacosServiceRegistry是屬于nacos-discovery包中的類,NacosNamingService是屬于nacos-client包中的類。

public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private final NacosDiscoveryProperties nacosDiscoveryProperties; public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
} @Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
} NamingService namingService = namingService();
//服務名稱
String serviceId = registration.getServiceId();
//服務分組
String group = nacosDiscoveryProperties.getGroup();
//服務實例,包含了IP、Port等信息
Instance instance = getNacosInstanceFromRegistration(registration); try {
//調用NacosNamingService.registerInstance()方法把當前的服務實例注冊到Nacos中
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
} catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
rethrowRuntimeException(e);
}
} private NamingService namingService() {
return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
} private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
return instance;
}
...
} public class NacosNamingService implements NamingService {
private BeatReactor beatReactor;
private NamingProxy serverProxy;
... @Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
//獲取分組服務名字
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//判斷要注冊的服務實例是否是臨時實例
if (instance.isEphemeral()) {
//如果是臨時實例,則構建心跳信息
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加心跳信息
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//接下來調用NamingProxy的注冊方法registerService()來注冊服務實例
serverProxy.registerService(groupedServiceName, groupName, instance);
}
...
} public class NamingProxy implements Closeable {
private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
... //register a instance to service with specified instance properties.
//@param serviceName name of service
//@param groupName group of service
//@param instance instance to register
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance);
//創建一個Map組裝注冊請求參數
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
//下面UtilAndComs常量類拼裝的請求url是: /Nacos/v1/ns/instance
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
} public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
return reqApi(api, params, Collections.EMPTY_MAP, method);
} public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {
return reqApi(api, params, body, getServerList(), method);
} //Request api.
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg());
throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
} //Call server.
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader(); String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
} try {
//調用NacosRestTemplate.exchangeForm()方法發起HTTP請求
HttpRestResult<String> restResult = nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
...
} public class NacosRestTemplate extends AbstractNacosRestTemplate {
private final HttpClientRequest requestClient;
... //Execute the HTTP method to the given URI template, writing the given request entity to the request, and returns the response as {@link HttpRestResult}.
public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception {
RequestHttpEntity requestHttpEntity = new RequestHttpEntity(header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
return execute(url, httpMethod, requestHttpEntity, responseType);
} private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
if (logger.isDebugEnabled()) {
logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
} ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
HttpClientResponse response = null;
try {
response = this.requestClient().execute(uri, httpMethod, requestEntity);
return responseHandler.handle(response);
} finally {
if (response != null) {
response.close();
}
}
} private HttpClientRequest requestClient() {
if (CollectionUtils.isNotEmpty(interceptors)) {
if (logger.isDebugEnabled()) {
logger.debug("Execute via interceptors :{}", interceptors);
}
return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());
}
return requestClient;
}
...
} public class DefaultHttpClientRequest implements HttpClientRequest {
private final CloseableHttpClient client; public DefaultHttpClientRequest(CloseableHttpClient client) {
this.client = client;
} @Override
public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception {
HttpRequestBase request = build(uri, httpMethod, requestHttpEntity);
//通過Apache的CloseableHttpClient組件執行HTTP請求
CloseableHttpResponse response = client.execute(request);
return new DefaultClientHttpResponse(response);
}
...
}

由此可知:Nacos客戶端是通過HTTP的方式往Nacos服務端發起服務注冊的,Nacos服務端會提供服務注冊的API接口給Nacos客戶端進行HTTP調用,Nacos官方Open API文檔中注冊服務實例的接口說明如下:

(3)Nacos客戶端如何發送服務心跳

調用NacosNamingService的registerInstance()方法注冊服務實例時,在調用NamingProxy的registerService()方法來注冊服務實例之前,會根據注冊的服務實例是臨時實例來構建和添加心跳信息到beatReactor,也就是調用BeatReactor的buildBeatInfo()方法和addBeatInfo()方法。

在BeatReactor的buildBeatInfo()方法中,會通過beatInfo的setPeriod()方法設置心跳間隔時間,默認是5秒。

在BeatReactor的addBeatInfo()方法中,倒數第二行會開啟一個延時執行的任務,執行的任務是根據心跳信息BeatInfo封裝的BeatTask。該BeatTask任務會交給BeatReactor的ScheduledExecutorService來執行,并通過beatInfo的getPeriod()方法獲取延時執行的時間為5秒。

在BeatTask的run()方法中,就會調用NamingProxy的sendBeat()方法發送心跳請求給Nacos服務端,也就是調用NamingProxy的reqApi()方法向Nacos服務端發起心跳請求。如果返回的心跳響應表明服務實例不存在則重新發起服務實例注冊請求。無論心跳響應如何,繼續根據心跳信息BeatInfo封裝一個BeatTask任務,然后將該任務交給線程池ScheduledExecutorService來延時5秒執行。

public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private final NacosDiscoveryProperties nacosDiscoveryProperties; public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
} @Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
} NamingService namingService = namingService();
//服務名稱
String serviceId = registration.getServiceId();
//服務分組
String group = nacosDiscoveryProperties.getGroup();
//服務實例,包含了IP、Port等信息
Instance instance = getNacosInstanceFromRegistration(registration); try {
//調用NacosNamingService.registerInstance()方法把當前的服務實例注冊到Nacos中
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
} catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
rethrowRuntimeException(e);
}
} private NamingService namingService() {
return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
} private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
return instance;
}
...
} public class NacosNamingService implements NamingService {
private BeatReactor beatReactor;
private NamingProxy serverProxy;
... @Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
//獲取分組服務名字
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//判定要注冊的服務實例是否是臨時實例
if (instance.isEphemeral()) {
//如果是臨時實例,則構建心跳信息
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加心跳信息
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//接下來調用NamingProxy的注冊方法registerService()來注冊服務實例
serverProxy.registerService(groupedServiceName, groupName, instance);
}
...
} public class BeatReactor implements Closeable {
...
//Build new beat information.
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
//getInstanceHeartBeatInterval()的返回值是5000
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
...
} @JsonInclude(Include.NON_NULL)
public class Instance implements Serializable {
...
public long getInstanceHeartBeatInterval() {
//Constants.DEFAULT_HEART_BEAT_INTERVAL,默認是5000
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
...
} public class BeatReactor implements Closeable {
private final ScheduledExecutorService executorService;
private final NamingProxy serverProxy;
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>(); public BeatReactor(NamingProxy serverProxy) {
this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
} public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
... //Add beat information.
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
//開啟一個延時執行的任務,執行的任務是BeatTask
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
... class BeatTask implements Runnable {
BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
} @Override
public void run() {
//判斷是否需要停止
if (beatInfo.isStopped()) {
return;
}
//獲取下一次執行的時間,同樣還是5s
long nextTime = beatInfo.getPeriod();
try {
//調用NamingProxy.sendBeat()方法發送心跳請求給Nacos服務端
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
//獲取Nacos服務端返回的code狀態碼
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
//如果code = RESOURCE_NOT_FOUND,沒有找到資源,那么表示之前注冊的信息,已經被Nacos服務端移除了
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
//然后重新組裝參數,重新發起注冊請求
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
//調用NamingProxy.registerService()方法發送服務實例注冊請求到Nacos服務端
serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
//把beatInfo又重新放入延遲任務當中,并且還是5秒,所以一直是個循環的狀態
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
} public class NamingProxy implements Closeable {
...
//Send beat.
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
...
}

由此可見,在客戶端在發起服務注冊期間,會開啟一個心跳健康檢查的延時任務,這個任務每間隔5s執行一次。任務內容就是通過HTTP請求調用發送Nacos提供的服務實例心跳接口。Nacos官方Open API文檔中服務實例心跳接口說明如下:

如下是客戶端發起服務注冊 + 發送服務心跳的整個流程圖:

2.服務端如何處理客戶端的服務注冊請求

(1)客戶端自動發送服務注冊請求梳理

(2)Nacos服務端處理服務請求的代碼入口

(3)Nacos服務端處理服務注冊請求的源碼分析

(4)服務端接收到服務實例注冊請求后的處理總結

(1)客戶端自動發送服務注冊請求梳理

首先,從spring-cloud-starter-alibaba-nacos-discovery中,發現在spring.factories文件定義了很多Configuration配置類,其中就包括了NacosServiceRegistryAutoConfiguration配置類。這個配置類會創建三個Bean對象,其中有個Bean對象便實現了一個監聽事件方法。

然后,Spring容器啟動時,會發布一個事件。這個事件會被名為NacosAutoServiceRegistration的Bean對象監聽到,從而自動發起Nacos服務注冊。在注冊時會開啟心跳健康延時任務,每隔5s執行一次。不管是服務注冊還是心跳檢查,都是通過HTTP方式調用Nacos服務端。

客戶端向服務端發起服務注冊請求是通過HTTP接口"/nacos/v1/ns/instance"來實現的,客戶端向服務端發起心跳請求是通過HTTP接口"/nacos/v1/ns/instance/beat"來實現的。

(2)Nacos服務端處理服務注冊請求的代碼入口

Nacos服務端有一個叫nacos-naming的模塊,這個nacos-naming模塊其實就是一個Spring Boot項目,模塊中的controllers包則是用來處理服務相關的HTTP請求。

由于服務端處理服務注冊請求的地址是"/nacos/v1/ns/instance",所以對服務實例進行處理的入口是controllers包下的InstanceController。InstanceController的代碼很好地遵守了Restful風格,其中的regsiter()方法注冊新服務實例對應@PostMapping、deregister()方法注銷服務實例對應@DeleteMapping、update()方法修改服務實例對應@PutMapping。雖然都可以使用@PostMapping,但Nacos就嚴格按照了Restful標準。

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
...
//Register new instance.
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
...
} //Deregister instances.
@CanDistro
@DeleteMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String deregister(HttpServletRequest request) throws Exception {
...
} //Update instance.
@CanDistro
@PutMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String update(HttpServletRequest request) throws Exception {
...
}
...
} public class UtilsAndCommons {
// ********************** Nacos HTTP Context ************************ \\
public static final String NACOS_SERVER_CONTEXT = "/nacos";
public static final String NACOS_SERVER_VERSION = "/v1";
public static final String DEFAULT_NACOS_NAMING_CONTEXT = NACOS_SERVER_VERSION + "/ns";
public static final String NACOS_NAMING_CONTEXT = DEFAULT_NACOS_NAMING_CONTEXT;
...
}

(3)Nacos服務端處理服務注冊請求的源碼分析

對于Nacos客戶端的服務實例注冊請求,會由InstanceController的register()方法進行處理。該方法首先會從請求參數中獲取Instance服務實例,然后調用ServiceManager的registerInstance()方法來進行服務實例注冊。ServiceManager是Nacos的服務管理者,擁有所有的服務列表,可以通過它來管理所有服務的注冊、銷毀、修改等。

在ServiceManager的registerInstance()方法中:首先會通過調用ServiceManager的createEmptyService()方法創建一個空服務,然后通過ServiceManager的addInstance()方法添加注冊請求中的服務實例。

在ServiceManager的addInstance()方法中:首先構建出要注冊的服務實例對應的服務的key,然后使用synchronized鎖住要注冊的服務實例對應的服務,接著獲取要注冊的服務實例對應的服務的最新服務實例列表,最后執行DelegateConsistencyServiceImpl的put()方法更新服務實例列表。

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
@Autowired
private ServiceManager serviceManager; ...
//Register new instance.
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//從request中獲取命名空間、服務名稱
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
//從request中獲取Instance服務實例
final Instance instance = parseInstance(request);
//調用ServiceManager的注冊實例方法
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
...
} //服務管理者,擁有所有的服務列表,用于管理所有服務的注冊、銷毀、修改等
@Component
public class ServiceManager implements RecordListener<Service> {
//注冊表,Map(namespace, Map(group::serviceName, Service)).
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); @Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService; private final Object putServiceLock = new Object(); ...
//Register an instance to a service in AP mode.
//This method creates service or cluster silently if they don't exist.
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//1.創建一個空的服務
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//2.根據命名空間ID、服務名獲取一個服務,如果獲取結果為null則拋異常
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//3.添加服務實例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
} ...
//1.創建一個空服務
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
} //Create service if not exist.
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
//now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
} private void putServiceAndInit(Service service) throws NacosException {
//把Service放入注冊表serviceMap中
putService(service);
service.init();
//把Service作為監聽器添加到consistencyService的listeners中
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
} //Put service into manager.
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
} public void addOrReplaceService(Service service) throws NacosException {
consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);
} ...
//2.根據命名空間ID、服務名獲取一個服務
public Service getService(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseServiceMap(namespaceId).get(serviceName);
} public Map<String, Service> chooseServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
} ...
//3.添加服務實例
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
//構建要注冊的服務實例對應的服務的key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//根據命名空間以及服務名獲取要注冊的服務實例對應的服務
Service service = getService(namespaceId, serviceName);
//使用synchronized鎖住要注冊的服務實例對應的服務
synchronized (service) {
//由于一個服務可能存在多個服務實例,所以需要根據當前注冊請求的服務實例ips,獲取對應服務的最新服務實例列表
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
//Instances實現了用于在Nacos集群進行網絡傳輸的Record接口
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//執行DelegateConsistencyServiceImpl的put()方法
consistencyService.put(key, instances);
}
} private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
//更新對應服務的服務實例列表
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
} //Compare and get new instance list.
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
//先獲取已經注冊到Nacos的、當前要注冊的服務實例對應的服務的、所有服務實例
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
for (Instance instance : currentIPs) {
//把instance實例的IP當作key,instance實例當作value,放入currentInstances
currentInstances.put(instance.toIpAddr(), instance);
//把實例唯一編碼添加到currentInstanceIds中
currentInstanceIds.add(instance.getInstanceId());
} //用來存放當前要注冊的服務實例對應的服務的、所有服務實例
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length);
}
for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) {
instance.setInstanceId(oldInstance.getInstanceId());
} else {
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
//instanceMap的key與IP和端口有關
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));
}
//最后instanceMap里肯定會包含新注冊的Instance實例
//并且如果不是第一次注冊,里面還會包含之前注冊的Instance實例信息
return new ArrayList<>(instanceMap.values());
}
...
} //Package of instance list.
public class Instances implements Record {
private List<Instance> instanceList = new ArrayList<>();
...
} public class KeyBuilder {
public static final String INSTANCE_LIST_KEY_PREFIX = "com.alibaba.nacos.naming.iplist.";
private static final String EPHEMERAL_KEY_PREFIX = "ephemeral.";
public static final String NAMESPACE_KEY_CONNECTOR = "##";
... public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) {
return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName) : buildPersistentInstanceListKey(namespaceId, serviceName);
} //返回的key形如:"com.alibaba.nacos.naming.iplist.ephemeral." + namespaceId + " + "##" + serviceName
private static String buildEphemeralInstanceListKey(String namespaceId, String serviceName) {
return INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName;
} public static boolean matchEphemeralKey(String key) {
//currently only instance list has ephemeral type:
return matchEphemeralInstanceListKey(key);
} public static boolean matchEphemeralInstanceListKey(String key) {
//判定key是否是以這樣的字符串開頭:"com.alibaba.nacos.naming.iplist.ephemeral."
return key.startsWith(INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX);
}
...
}

DelegateConsistencyServiceImpl的put()方法更新服務實例列表存儲時:首先會根據表示服務的key來選擇不同的ConsistencyService。如果是臨時服務實例,則調用DistroConsistencyServiceImpl的put()方法。如果是持久化服務實例,則調用PersistentConsistencyServiceDelegateImpl的put()方法。

在DistroConsistencyServiceImpl的put()方法中:首先會調用DistroConsistencyServiceImpl的onPut()方法,把包含當前注冊的服務實例的、最新服務實例列表存儲到DataStore中,然后調用DistroProtocol的sync()方法進行集群節點間的服務實例數據同步,其中DataStore用于存儲所有已注冊的服務實例數據。

而在DistroConsistencyServiceImpl的onPut()方法中:會先創建Datum對象,注入服務key和服務的所有服務實例Instances,然后才將Datum對象添加到DataStore的Map對象里。最后調用Notifier的addTask()方法添加一個數據變更的任務,也就是把key、action封裝成Pair對象,放入一個Notifier的阻塞隊列中。

注意:在DistroConsistencyServiceImpl初始化完成后,會提交一個進行無限for循環的任務給一個單線程的線程池來執行。無限for循環中會不斷從阻塞隊列中獲取Pair對象進行處理。而在進行服務實例注冊時,會往該任務的阻塞隊列添加Pair對象。

//Consistency delegate.
@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {
private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;
private final EphemeralConsistencyService ephemeralConsistencyService;
... @Override
public void put(String key, Record value) throws NacosException {
//如果是臨時實例,則調用DistroConsistencyServiceImpl.put()方法
//如果是持久化實例,則調用PersistentConsistencyServiceDelegateImpl.put()方法
mapConsistencyService(key).put(key, value);
} private ConsistencyService mapConsistencyService(String key) {
//根據不同的key選擇不同的ConsistencyService
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
...
} @DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private final GlobalConfig globalConfig;
private final DistroProtocol distroProtocol;
private final DataStore dataStore;//用于存儲所有已注冊的服務實例數據
private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();
private volatile Notifier notifier = new Notifier();
... @PostConstruct
public void init() {
//初始化完成后,會將notifier任務提交給GlobalExecutor來執行
GlobalExecutor.submitDistroNotifyTask(notifier);
} @Override
public void put(String key, Record value) throws NacosException {
//把包含了當前注冊的服務實例的、最新的服務實例列表,存儲到DataStore對象中
onPut(key, value);
//在集群架構下,DistroProtocol.sync()方法會進行集群節點的服務實例數據同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
} public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
//創建Datum對象,把服務key和服務的所有服務實例Instances放入Datum對象中
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//添加到DataStore的Map對象里
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
//添加處理任務
notifier.addTask(key, DataOperation.CHANGE);
}
... public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); //Add new notify task to queue.
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
//tasks是一個阻塞隊列,把key、action封裝成Pair對象,放入隊列中
tasks.offer(Pair.with(datumKey, action));
} public int getTaskSize() {
return tasks.size();
} @Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
} private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey); int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
} //用于存儲所有已注冊的服務實例數據
@Component
public class DataStore {
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
public void put(String key, Datum value) {
dataMap.put(key, value);
}
...
}

(4)服務端接收到服務實例注冊請求后的處理總結

register()注冊方法會先從Request對象中獲取從客戶端傳過來的參數,然后在addInstance()方法中會創建一個可以表示服務的key,接著調用DelegateConsistencyServiceImpl的put()方法,根據這個key可以選擇具體的ConsistencyService實現類。

在這個put()方法中,通過key選擇的是EphemeralConsistencyService,所以會調用DistroConsistencyServiceImpl的put()方法處理服務實例列表。

在DistroConsistencyServiceImpl的put()方法中又調用了onPut()方法,即把key、Instances封裝成Datum對象,放入到DataStore的Map里。最后調用addTask()方法,將本次服務實例數據的變更包裝成Pair對象,然后放入到一個阻塞隊列里,由一個執行無限for循環的線程處理隊列。

3.注冊服務—如何實現高并發支撐上百萬服務注冊

(1)服務端處理客戶端的服務注冊請求梳理

(2)Nacos的異步任務設計思想

(3)異步任務和內存隊列源碼分析

(1)服務端處理客戶端的服務注冊請求梳理

Nacos客戶端自動注冊服務實例時,會通過HTTP的方式,請求"/nacos/v1/ns/instance"地址來調用Nacos服務端的實例注冊接口。通過該地址可以找到Nacos服務端naming模塊的InstanceController類。在這個類中有個register()方法,它就是服務端處理服務注冊請求的入口。在這個register()方法的最后,會調用Notifier的addTask()方法,也就是把key、action包裝成Pair對象,放入到一個BlockingQueue里。至此,InstanceController類中register()方法的注冊邏輯就執行完了。

(2)Nacos的異步任務設計思想

一.Nacos服務實例注冊的壓測性能

二.Nacos服務端添加和處理異步任務的流程

三.Nacos采用異步任務來處理服務注冊的好處—支撐高并發

一.Nacos服務實例注冊的壓測性能

參考服務發現性能測試報告。通過對3節點的集群進行服務發現性能壓測,可得到接口性能負載和容量。壓測容量服務數可達60W,實例注冊數達110W,集群運行持續穩定。注冊/查詢實例TPS達到13000以上,接口達到預期。

二.Nacos服務端添加和處理異步任務的流程

首先客戶端發起服務實例注冊,服務端把接收的參數包裝成一個Pair對象,最后放入到一個BlookingQueue里。這時對服務實例注冊接口的處理已結束,服務端返回客戶端響應消息了。

然后Nacos服務端會在后臺開啟一個單線程異步任務,這個任務會不斷地獲取BlookingQueue隊列中的Pair對象。從這個隊列獲取出Pair對象后,會把信息寫入注冊表,從而完成服務注冊。

三.Nacos采用異步任務來處理服務注冊的好處—支撐高并發

好處一:接口響應時效更快

其實Nacos服務端處理服務實例注冊的接口,并沒有執行真正注冊的動作。只是把信息包裝好,放入到隊列中,接口就結束返回響應給客戶端了。由于代碼邏輯非常簡單,所以響應時效會更快。

好處二:保證服務穩定性

哪怕同時有1千個、1萬個客戶端同時發起實例注冊請求接口,最后只是把服務實例注冊任務放入到一個阻塞隊列中。這就相當于使用消息隊列進行流量削峰一樣,后續復雜的處理邏輯,由消費者慢慢處理,異步任務就相當于消費者。

好處三:解決寫時并發沖突

Nacos服務端,只有一個單線程在處理隊列中的任務。也就是把阻塞隊列中的服務實例注冊信息,同步到Nacos的注冊表中。既然是單線程進行寫操作,所以就不用考慮多線程并發寫的問題。雖然只會有一個線程在進行寫,但是可能會有其他線程在進行讀。所以會存在讀寫并發沖突,此時Nacos會使用寫時復制策略來處理。

(3)異步任務和內存隊列源碼分析

一.異步任務的初始化和處理流程

二.關于無限for循環的問題

一.異步任務的初始化和處理流程

在創建DistroConsistencyServiceImpl類實例時,會直接創建一個實現了Runnable接口的Notifier類實例。

在DistroConsistencyServiceImpl類中有個init()方法。由于這個init()方法上加了@PostConstruct注解,所以在Spring創建這個類實例時會自動調用這個init()方法。init()方法會提交這個實現了Runnable接口的Notifier任務給線程池運行。

而在Notifier類的run()方法中,會通過無限for循環不斷從tasks阻塞隊列中獲取任務來進行處理。獲取出任務后,如果判斷出action類型為CHANGE類型,則先把Instances對象從DataStore類中取出來,再調用listener的onChange()方法來將服務實例信息寫入到注冊表中。

二.關于無限for循環的問題

無限循環是否合理、是否會占用CPU資源、如果異常是否會導致循環結束?

因為Nacos服務端要一直處理Nacos客戶端所發起的服務實例注冊請求,而Nacos服務端它是不知道到底有多少個客戶端需要進行服務注冊的,所以只能寫一個無限for循環一直不斷重復地去執行。

既然是無限循環,就要考慮是否占用CPU資源的問題。tasks是一個阻塞隊列BlockingQueue:第一.阻塞隊列的特點就是不會占用CPU的資源,第二.tasks的take()方法會一直阻塞直到取得元素或當前線程中斷。

在處理過程中,如果拋出未知異常,會直接被for循環中的try catch掉,繼續循環處理下一個任務。

@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private final GlobalConfig globalConfig;
private final DistroProtocol distroProtocol;
private final DataStore dataStore;//用于存儲所有已注冊的服務實例數據
private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();
private volatile Notifier notifier = new Notifier();
... @PostConstruct
public void init() {
//初始化完成后,會將notifier任務提交給GlobalExecutor來執行
GlobalExecutor.submitDistroNotifyTask(notifier);
}
... public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); //Add new notify task to queue.
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
//tasks是一個阻塞隊列,把key、action封裝成Pair對象,放入隊列中
tasks.offer(Pair.with(datumKey, action));
} @Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
//無限循環
for (; ;) {
try {
//從阻塞隊列中獲取任務
Pair<String, DataOperation> pair = tasks.take();
//處理任務
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
} private void handle(Pair<String, DataOperation> pair) {
try {
//把在DistroConsistencyServiceImpl.onPut()方法創建的key和action取出來
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey); int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
//把Instances信息寫到注冊表里去
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
... @Override
public void put(String key, Record value) throws NacosException {
//把包含了當前注冊的服務實例的、最新的服務實例列表,存儲到DataStore對象中
onPut(key, value);
//在集群架構下,DistroProtocol.sync()方法會進行集群節點的服務實例數據同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
} public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
//創建Datum對象,把服務key和服務的所有服務實例Instances放入Datum對象中
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//添加到DataStore的Map對象里
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
//添加處理任務
notifier.addTask(key, DataOperation.CHANGE);
}
} public class GlobalExecutor {
private static final ScheduledExecutorService DISTRO_NOTIFY_EXECUTOR =
ExecutorFactory.Managed.newSingleScheduledExecutorService(
ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.distro.notifier")
);
... public static void submitDistroNotifyTask(Runnable runnable) {
//向線程池提交任務,讓線程池執行任務
DISTRO_NOTIFY_EXECUTOR.submit(runnable);
}
...
} public class NameThreadFactory implements ThreadFactory {
private final AtomicInteger id = new AtomicInteger(0);
private String name; public NameThreadFactory(String name) {
if (!name.endsWith(StringUtils.DOT)) {
name += StringUtils.DOT;
}
this.name = name;
} @Override
public Thread newThread(Runnable r) {
String threadName = name + id.getAndDecrement();
Thread thread = new Thread(r, threadName);
thread.setDaemon(true);
return thread;
}
} public final class ExecutorFactory {
...
public static final class Managed {
private static final String DEFAULT_NAMESPACE = "nacos";
private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();
... //Create a new single scheduled executor service with input thread factory and register to manager.
public static ScheduledExecutorService newSingleScheduledExecutorService(final String group, final ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);
//注冊到ThreadPoolManager可以方便管理ScheduledExecutorService,比如注銷、銷毀
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}
...
}
...
} public final class ThreadPoolManager {
private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;
private Map<String, Object> lockers = new ConcurrentHashMap<String, Object>(8);
... //Register the thread pool resources with the resource manager.
public void register(String namespace, String group, ExecutorService executor) {
if (!resourcesManager.containsKey(namespace)) {
synchronized (this) {
lockers.put(namespace, new Object());
}
}
final Object monitor = lockers.get(namespace);
synchronized (monitor) {
Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);
if (map == null) {
map = new HashMap<String, Set<ExecutorService>>(8);
map.put(group, new HashSet<ExecutorService>());
map.get(group).add(executor);
resourcesManager.put(namespace, map);
return;
}
if (!map.containsKey(group)) {
map.put(group, new HashSet<ExecutorService>());
}
map.get(group).add(executor);
}
} //Cancel the uniform lifecycle management for all threads under this resource.
public void deregister(String namespace, String group) {
if (resourcesManager.containsKey(namespace)) {
final Object monitor = lockers.get(namespace);
synchronized (monitor) {
resourcesManager.get(namespace).remove(group);
}
}
}
...
}

總結:異步任務是提升性能的一種方式。很多開源框架為了提升自身處理性能,都會采利用異步任務 + 內存隊列。

4.內存注冊表—如何處理注冊表的高并發讀寫沖突

(1)服務實例注冊的客戶端源碼和服務端源碼梳理

(2)Nacos注冊表結構

(3)寫時復制機制介紹

(4)Nacos服務注冊寫入注冊表源碼分析

(1)服務實例注冊的客戶端源碼和服務端源碼梳理

一.客戶端發起服務注冊的源碼梳理

訂單服務、庫存服務的項目引入nacos-discovery服務注冊中心依賴后,當項目啟動時,就會掃描到依賴中的spring.factories文件,然后去創建spring.factories文件中定義的配置類。

在spring.factories文件中:有一個名為NacosServiceRegistryAutoConfiguration配置類,在這個配置類定義了三個Bean對象:NacosServiceRegistry、NacosRegistration和NacosAutoServiceRegistration。

NacosAutoServiceRegistration類的父類實現了ApplicationListener接口,也就是實現了onApplicationEvent()這個監聽事件方法。當Spring容器啟動時,會發布WebServerInitializedEvent監聽事件,從而被Nacos客戶端即NacosAutoServiceRegistration的監聽方法監聽到。

這個監聽事件方法會調用NacosServiceRegistry類中的register()方法,register()方法又會調用Nacos服務端實例注冊的HTTP接口完成服務注冊。

在發起服務實例注冊接口的調用前,客戶端還會開啟一個BeatTask任務,這個BeatTask任務會每隔5秒向Nacos服務端發送心跳檢查請求。

二.服務端處理服務注冊的源碼梳理

Nacos服務端處理服務注冊的HTTP接口是:/nacos/v1/ns/instance。由于Nacos服務端也是個Spring Boot項目,所以通過架構圖找到Nacos源碼的naming模塊,然后就可以通過請求地址定位到InstanceController類。

在InstanceController類中會有對應HTTP接口的register()方法,該方法最終會把客戶端的實例對象包裝成Datum對象放入DataStore類中,然后再包裝一個Pair對象,放入Notifier的tasks內存阻塞隊列。

DistroConsistencyServiceImpl中有個@PostConstruct修飾的init()方法。在該類被實例化后,這個init()方法會把一個Notifier任務提交給一個線程池執行。

Notifier的run()方法,首先會不斷循環從tasks阻塞隊列中獲取Pair對象,然后調用Notifier的handle()方法把Instances對象從DataStore類中取出來,接著調用listener.onChange()方法把服務實例數據寫入到注冊表中。

(2)Nacos注冊表結構

一.Nacos注冊表的使用

在ServiceManager類中有一個serviceMap屬性,它就是Nacos的內存注冊表,Nacos注冊表就是用來存放微服務實例注冊信息的地方。客戶端在調用其他微服務時,會先調用Nacos查詢實例列表接口,查詢當前可用服務,從而發起微服務調用。

//Core manager storing all services in Nacos.@Component
public class ServiceManager implements RecordListener<Service> {
//注冊表,Map(namespace, Map(group::serviceName, Service)).
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
... public Service getService(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseServiceMap(namespaceId).get(serviceName);
} public Map<String, Service> chooseServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
...
} @JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private Map<String, Cluster> clusterMap = new HashMap<>();
...
} public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
//持久化實例列表
@JsonIgnore
private Set<Instance> persistentInstances = new HashSet<>(); //臨時實例列表
@JsonIgnore
private Set<Instance> ephemeralInstances = new HashSet<>();
...
}

二.Nacos注冊表的結構分析

ServiceManager的serviceMap屬性,即注冊表結構由兩層Map組合而成。也就是:Map(namespace, Map(group::serviceName, Service))。

Nacos支持對服務進行分類,最上層是一個命名空間Namespace。命名空間Namespace默認是public,也可以自定義為dev、test等。

在public命名空間下,可以包含不同的分組Group。比如定義兩個分組Group:DEFAULT_GROUP_1、DEFAULT_GROUP_2。這樣命名空間Namespace和分組Group就對應注冊表最外層的兩個Map。

在ServiceManager.serviceMap的內層Map中,其value是個Service對象。在Service類中,有一個clusterMap屬性。clusterMap的key是對應的集群名字,如北京集群、廣州集群等。clusterMap的value是個Cluster對象,用來存放某集群下的所有實例對象。

在Cluster類中,存在兩個不同實例類型的Set集合,這兩個集合就會存儲具體的Instance實例對象,Instance實例對象里會包含實例的IP、Port等信息。

三.Nacos注冊表的設計原因

之所以Nacos要這么設計注冊表,那是為了靈活應對不同的使用場景。如果項目簡單,測試、預發、生產不同環境都使用同一個Nacos服務端,那么可以通過命名空間來區分。

如果項目復雜,不同環境使用不同的Nacos服務端,那么可以通過命名空間來區分不同的模塊。而訂單模塊下可以細分很多微服務,然后通過分組來區分不同的環境。包括在Service對象里,同一個服務也可能在多個地區都有部署。比如北京服務器部署2臺、廣州服務器部署2臺等。

(3)寫時復制機制介紹

Nacos服務端把新注冊的實例寫入到注冊表中,用的就是寫時復制機制。寫時復制機制,能夠很好地避免讀寫并發沖突。

寫時復制:Copy On Write。在數據寫入到某存儲位置時,首先將原有內容拷貝出來,寫到另一處地方,然后再將原來的引用地址修改成新對象的地址。

下面展示了一個并發沖突的例子:

public static void main(String[] args) {
//假設objectSet是用來存放實例信息
Set<Object> objectSet = new HashSet<>(); //模擬異步任務,寫入數據
new Thread(new Runnable() {
@Override
public void run() {
try {
//先睡眠一下,否則還沒開始讀,就已經寫完了
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
//寫入10w條數據
for (int i = 0; i < 100000; i++) {
objectSet.add(i);
}
}
}).start(); //死循環一直讀取數據,模擬高并發場景
for (; ;) {
for (Object o : objectSet) {
System.out.println(o);
}
}
}

運行上面的代碼就會拋出如下異常信息:

Exception in thread "main" java.util.ConcurrentModificationException

意思是在對集合迭代、讀取時,如果同時對其進行修改,就會拋出ConcurrentModificationException異常。

這時候就可以采用寫時復制來避免這個問題。先創建一個復制對象,把原來的數據復制一份到該復制對象上。然后在復制對象上進行新增、修改的操作,這時是不會影響原來數據的。等到在復制對象上進行的操作完成之后,再把原來對象的引用地址直接修改為復制對象的引用。

(4)Nacos服務注冊寫入注冊表源碼分析

在執行Notifier的handle()方法時,核心的代碼是:

//把Instances信息寫到注冊表里去
listener.onChange(datumKey, dataStore.get(datumKey).value);

dataStore.get(datumKey).value就是從DataStore中獲取Instances對象。listener.onChange()其實就是調用Service的onChange()方法更新注冊表。

因為在注冊某個服務的第一個實例時,創建的服務Service會作為Listener添加到ConsistencyService的listeners,并且已經將新創建的服務Service放入到了ServiceManager的注冊表中了。所以線程池執行Notifier的handle()方法時,就能遍歷所有Service進行更新。

其實注冊表serviceMap只是存放了Service對象的引用,而ConsistencyService的listeners也存放了Service對象的引用。當遍歷ConsistencyService的listeners,執行Service.onChange()方法時,更新的就是JVM在堆內存中的Service實例對象,也就更新了注冊表。因為注冊表是一個Map,最終都是引用到對內存中的Service實例對象。

@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private final DataStore dataStore;
private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();
... public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
... @Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
//無限循環
for (; ;) {
try {
//從阻塞隊列中獲取任務
Pair<String, DataOperation> pair = tasks.take();
//處理任務
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
} private void handle(Pair<String, DataOperation> pair) {
try {
//把在DistroConsistencyServiceImpl.onPut()方法創建的key和action取出來
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey); int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
//把Instances信息寫到注冊表里去
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
... @Override
public void put(String key, Record value) throws NacosException {
//把包含了當前注冊的服務實例的、最新的服務實例列表,存儲到DataStore對象中
onPut(key, value);
//在集群架構下,DistroProtocol.sync()方法會進行集群節點的服務實例數據同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
} //Put a new record.
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
//創建Datum對象,把服務key和服務的所有服務實例Instances放入Datum對象中
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//添加到DataStore的Map對象里
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
//添加處理任務
notifier.addTask(key, DataOperation.CHANGE);
}
...
} //Store of data.
@Component
public class DataStore {
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
... public Datum get(String key) {
return dataMap.get(key);
}
...
} public class Datum<T extends Record> implements Serializable {
public String key;
public T value;
...
} //Package of instance list.
public class Instances implements Record {
private List<Instance> instanceList = new ArrayList<>();
...
} //服務管理者,擁有所有的服務列表,用于管理所有服務的注冊、銷毀、修改等
@Component
public class ServiceManager implements RecordListener<Service> {
//注冊表,Map(namespace, Map(group::serviceName, Service)).
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); @Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService; ...
//Register an instance to a service in AP mode.
//This method creates service or cluster silently if they don't exist.
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//1.創建一個空的服務
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//2.根據命名空間ID、服務名獲取一個服務,如果獲取結果為null則拋異常
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//3.添加服務實例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
} ...
//1.創建一個空服務
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
} //Create service if not exist.
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
//now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate(); putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
} private void putServiceAndInit(Service service) throws NacosException {
//把Service放入注冊表serviceMap中
putService(service);
service.init();
//把Service作為監聽器添加到consistencyService的listeners中
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
} //Put service into manager.
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
...
}

其中從DataStore中獲取出來的Instances對象的來源如下:

//服務管理者,擁有所有的服務列表,用于管理所有服務的注冊、銷毀、修改等
@Component
public class ServiceManager implements RecordListener<Service> {
//Map(namespace, Map(group::serviceName, Service)).
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); @Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
... //添加服務實例
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
//構建要注冊的服務實例對應的服務的key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //根據命名空間以及服務名獲取要注冊的服務實例對應的服務
Service service = getService(namespaceId, serviceName); //使用synchronized鎖住要注冊的服務實例對應的服務
synchronized (service) {
//由于一個服務可能存在多個服務實例,所以需要根據當前注冊請求的服務實例ips,獲取對應服務的最新服務實例列表
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
//Instances實現了用于在Nacos集群進行網絡傳輸的Record接口
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//執行DelegateConsistencyServiceImpl的put()方法
consistencyService.put(key, instances);
}
} private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
//更新對應服務的服務實例列表
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
} //Compare and get new instance list.
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
//先獲取已經注冊到Nacos的、當前要注冊的服務實例對應的服務的、所有服務實例
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
for (Instance instance : currentIPs) {
//把instance實例的IP當作key,instance實例當作value,放入currentInstances
currentInstances.put(instance.toIpAddr(), instance);
//把實例唯一編碼添加到currentInstanceIds中
currentInstanceIds.add(instance.getInstanceId());
} //用來存放當前要注冊的服務實例對應的服務的、所有服務實例
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length);
} for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) {
instance.setInstanceId(oldInstance.getInstanceId());
} else {
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
//instanceMap的key與IP和端口有關
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));
}
//最后instanceMap里肯定會包含新注冊的Instance實例
//并且如果不是第一次注冊,里面還會包含之前注冊的Instance實例信息
return new ArrayList<>(instanceMap.values());
}
...
} //Package of instance list.
public class Instances implements Record {
private List<Instance> instanceList = new ArrayList<>();
...
}

接下來是Service的onChange()方法的詳情:

Service的onChange()方法需要傳入兩個參數:參數一是key,這個key是由KeyBuilder的buildInstanceListKey()代碼創建出來的。參數二是Instances,里面有個InstanceList屬性,可以存放多個Instance實例對象。實際上Instances參數可能會包含之前多個已經注冊的Instance實例信息,并且一定會包含當前新注冊的Instance實例信息。

Service的onChange()方法,最后會調用Service的updateIPs()方法。Service的updateIPs()方法又會調用Cluster的updateIps()方法,會把新注冊的Instance更新到Cluster對象實例中。

在Cluster的updateIps()方法中,便會通過寫時復制機制來更新實例Set。如果不用寫時復制,那么就會并發讀寫同一個Set對象。如果使用寫時復制,那么同一時間的讀和寫都是不同的Set對象。即使用新對象替換舊對象那一刻還有線程沒迭代讀完舊對象,也不影響。因為沒有迭代讀完舊對象的線程繼續進行迭代讀,替換的只是對象引用。ephemeralInstances變量只是引用了Set對象的地址而已。這里說的替換,只是讓ephemeralInstances變量引用另外Set對象的地址。

//Service of Nacos server side
//We introduce a 'service --> cluster --> instance' model,
//in which service stores a list of clusters, which contain a list of instances.
//his class inherits from Service in API module and stores some fields that do not have to expose to client.
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private Map<String, Cluster> clusterMap = new HashMap<>();
... @Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
//Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
} //Update instances. 這里的instances里就包含了新注冊的實例對象
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
//clusterMap表示的是該服務的集群
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
} //遍歷全部實例對象:包括已經注冊過的實例對象 和 新注冊的實例對象
//這里的作用就是對相同集群下的instance進行分類
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
//判定客戶端傳過來的instance實例中,是否設置了ClusterName
if (StringUtils.isEmpty(instance.getClusterName())) {
//如果否,就設置instance實例的ClusterName為DEFAULT
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
//判斷之前是否存在對應的CLusterName,如果沒有則需要創建新的Cluster對象
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());
//創建新的Cluster集群對象
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
//將新創建的Cluster對象放入到集群clusterMap中
getClusterMap().put(instance.getClusterName(), cluster);
}
//根據集群名字,從ipMap里面獲取集群下的所有實例
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
//將客戶端傳過來的新注冊的instance實例,添加到clusterIPs,也就是ipMap中
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
} //對所有的服務實例分好類之后,按照ClusterName來更新注冊表
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//entryIPs已經是根據ClusterName分好組的實例列表了
List<Instance> entryIPs = entry.getValue();
//調用Cluster.updateIps()方法,根據寫時復制,對注冊表中的每一個Cluster對象進行更新
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString());
}
...
} public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
@JsonIgnore
private Set<Instance> persistentInstances = new HashSet<>(); @JsonIgnore
private Set<Instance> ephemeralInstances = new HashSet<>(); @JsonIgnore
private Service service;
... //Update instance list.
public void updateIps(List<Instance> ips, boolean ephemeral) {
//先判定是否是臨時實例,然后把對應的實例數據取出來,放入到新創建的toUpdateInstances集合中
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
//將老的實例列表toUpdateInstances復制一份到oldIpMap中
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
... //最后把傳入進來的實例列表,重新初始化一個HaseSet,賦值給toUpdateInstances
toUpdateInstances = new HashSet<>(ips); //判斷是否是臨時實例,將Cluster的persistentInstances或ephemeralInstances替換為toUpdateInstances
if (ephemeral) {
//直接把之前的實例列表替換成新的
ephemeralInstances = toUpdateInstances;
} else {
//直接把之前的實例列表替換成新的
persistentInstances = toUpdateInstances;
}
}
...
}

從這部分源碼中就可以看出,全程都沒有對之前注冊表中的數據進行操作。而是先拿出來,最后直接把新的數據替換過去,這樣就完成了注冊表修改。從而避免了對Set的并發讀寫沖突。

總結

以上是生活随笔為你收集整理的Nacos源码—1.Nacos服务注册发现分析一的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。