nacos客户端源码分析

一、概述

Windows如何安装部署nacos

springboot集成nacos

什么是 Nacos

二、源码

客户端重点关注三点:服务注册、心跳机制、服务发现

1、服务注册

将ip和port等信息注册到服务端,将服务暴露出来供其它服务使用

根据springboot的自动装配机制,查看spring-cloud-alibaba-nacos-discovery依赖中的:/META-INF/spring.factories 件:

NacosDiscoveryAutoConfiguration:

@Configuration
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {

  // 注册bean:加载spring.cloud.nacos.discovery打头的配置
	@Bean
	@ConditionalOnMissingBean
	public NacosDiscoveryProperties nacosProperties() {
		return new NacosDiscoveryProperties();
	}

  // 注册bean:nacos服务发现
	@Bean
	@ConditionalOnMissingBean
	public NacosServiceDiscovery nacosServiceDiscovery(
			NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
	}
}

RibbonNacosAutoConfiguration:

@Configuration
@EnableConfigurationProperties
@ConditionalOnBean(SpringClientFactory.class)
@ConditionalOnRibbonNacos
// spring.cloud.nacos.discovery.enabled 默认开启
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = NacosRibbonClientConfiguration.class)
public class RibbonNacosAutoConfiguration {

}

NacosServiceRegistryAutoConfiguration

@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

  // nacos服务注册类,核心注册方法:NacosServiceRegistry#register【重要】
	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}

  // 该类中有一个初始化init()方法,可做一些定制的处理
	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
	}

  // nacos服务自动注册【重要】
	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,	autoServiceRegistrationProperties, registration);
	}
}

// new NacosAutoServiceRegistration()
public NacosAutoServiceRegistration(ServiceRegistry serviceRegistry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
   // 父类AbstractAutoServiceRegistration【父类实现了ApplicationListener接口】
   // 并监听了WebServerInitializedEvent事件
   // 则tomcat启动成功之后,springboot会发送WebServerInitializedEvent事件
   // 然后AbstractAutoServiceRegistration监听该事件,触发onApplicationEvent方法
		super(serviceRegistry, autoServiceRegistrationProperties);
		this.registration = registration;
	}

AbstractAutoServiceRegistration#onApplicationEvent

public void onApplicationEvent(WebServerInitializedEvent event) {
    // 监听到WebServerInitializedEvent事件调用bind方法
		bind(event);
	}

public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(((ConfigurableWebServerApplicationContext) context)
					.getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort());
  	// 开始注册
		this.start();
	}

AbstractAutoServiceRegistration#start

public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}
		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get()) {
      // 发送【预注册】事件
			this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
      // 调用NacosServiceRegistry#register方法进行注册【重要】
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
      // 发送注册完毕事件
			this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
			this.running.compareAndSet(false, true);
		}
	}

NacosServiceRegistry#register

	public void register(Registration registration) {
		if (StringUtils.isEmpty(registration.getServiceId())) {
			log.warn("No service to register for nacos client...");
			return;
		}
    // 获取NamingService
		NamingService namingService = namingService();
    // 获取服务id
		String serviceId = registration.getServiceId();
    // 获取分组名称
		String group = nacosDiscoveryProperties.getGroup();
    // 封装实例信息Instance(ip、port、cluster等等)
		Instance instance = getNacosInstanceFromRegistration(registration);
		try {
      // 调用namingService.registerInstance方法进行注册【重要】
			namingService.registerInstance(serviceId, group, instance);
      // 注册成功之后会有日志输出
			log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,	instance.getIp(), instance.getPort());
		}	catch (Exception e) {
			log.error("nacos registry, {} register failed...{},", serviceId,	registration.toString(), e);
			// rethrow a RuntimeException if the registration is failed.
			// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
			rethrowRuntimeException(e);
		}
	}

NacosNamingService#registerInstance

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // 判断实例是否是临时的,默认是临时的,所以会走到该方法
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
          	// 添加心跳上报线程【重要】,我们后面再看心跳机制的流程
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
  			// 又调用serverProxy.registerService方法进行注册
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }

NamingProxy#registerService

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance);
        final Map params = new HashMap(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        // 服务端注册接口名称、所需参数等可参考上图
  			// UtilAndComs.nacosUrlInstance = "/nacos//v1/ns/instance";
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    }

NamingProxy#reqApi

// 该方法就不详细看了,最终向服务端发送HttpClientRequest
public String reqApi(String api, Map params, Map body, List servers, String method) throws NacosException {
        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
            throw new NacosException(NacosException.INVALID_PARAM, "no server available");
        }
        NacosException exception = new NacosException();
        if (StringUtils.isNotBlank(nacosDomain)) {
            for (int i = 0; i < maxRetry; i++) {
                try {
                    return callServer(api, params, body, nacosDomain, method);
                } catch (NacosException e) {
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {
                        NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                    }
                }
            }
        } else {
            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());
            for (int i = 0; i < servers.size(); i++) {
                String server = servers.get(index);
                try {
                    return callServer(api, params, body, server, method);
                } catch (NacosException e) {
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {
                        NAMING_LOGGER.debug("request {} failed.", server, e);
                    }
                }
                index = (index + 1) % servers.size();
            }
        }
        NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg());
        throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());

    }

2、心跳机制

AP模式下,发起http请求给服务端,维护服务的状态,默认5秒执行一次(CP模式是服务端主动检查)

NacosNamingService#registerInstance

注册服务前,会添加心跳上报线程

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // 判断实例是否是临时的,默认是临时的,所以会走到该方法
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
          	// 添加心跳上报线程【重要】
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }

BeatReactor#addBeatInfo

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
  			// 创建BeatTask心跳线程【客户端默认5秒发送一次心跳】
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }
class BeatTask implements Runnable {
        BeatInfo beatInfo;
        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }
        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            try {
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asLong();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
                    nextTime = interval;
                }
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
                    code = result.get(CommonParams.CODE).asInt();
                }
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setMetadata(beatInfo.getMetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try {
                      	// 本质还是调用上面我们看的注册服务方法
                        serverProxy.registerService(beatInfo.getServiceName(),  NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
                    }
                }
            } catch (NacosException ex) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",  JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
            }
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }

3、服务发现

通过http获取需要的服务实例的列表,并且定时拉取最新数据,默认1秒执行一次;

通过udp协议,服务端推送变动消息来更新服务实例列表;

服务发现本质是基于Ribbon的服务发现的:DynamicServerListLoadBalancer

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList serverList, ServerListFilter filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
  		  // 初始化
        restOfInit(clientConfig);
    }

void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
  			// 开启初始化同步服务列表线程
        enableAndInitLearnNewServersFeature();
  			// 更新服务列表
        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections().primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

enableAndInitLearnNewServersFeature()

public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

updateListOfServers()

public void updateListOfServers() {
        List servers = new ArrayList();
        if (serverListImpl != null) {
           // 调 NacosServerList.getUpdatedListOfServers() 法获取服务列表
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",  getIdentifier(), servers);
            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
  			// 更新服务列表
        updateAllServerList(servers);
    }

protected void updateAllServerList(List ls) {
        // other threads might be doing this - in which case, we pass
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    s.setAlive(true); // set so that clients can start using these
                                      // servers right away instead
                                      // of having to wait out the ping cycle.
                }
                setServersList(ls);
                super.forceQuickPing();
            } finally {
                serverListUpdateInProgress.set(false);
            }
        }
    }
展开阅读全文

页面更新:2024-03-12

标签:初始化   线程   服务端   源码   客户端   实例   本质   机制   事件   发现   方法   列表

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top