Windows如何安装部署nacos
springboot集成nacos
什么是 Nacos
客户端重点关注三点:服务注册、心跳机制、服务发现
1、服务注册
将ip和port等信息注册到服务端,将服务暴露出来供其它服务使用
根据springboot的自动装配机制,查看spring-cloud-alibaba-nacos-discovery依赖中的:/META-INF/spring.factories 件:
NacosDiscoveryAutoConfiguration:
@Configuration
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {
// 注册bean:加载spring.cloud.nacos.discovery打头的配置
@Bean
@ConditionalOnMissingBean
public NacosDiscoveryProperties nacosProperties() {
return new NacosDiscoveryProperties();
}
// 注册bean:nacos服务发现
@Bean
@ConditionalOnMissingBean
public NacosServiceDiscovery nacosServiceDiscovery(
NacosDiscoveryProperties discoveryProperties,
NacosServiceManager nacosServiceManager) {
return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
}
}
RibbonNacosAutoConfiguration:
@Configuration
@EnableConfigurationProperties
@ConditionalOnBean(SpringClientFactory.class)
@ConditionalOnRibbonNacos
// spring.cloud.nacos.discovery.enabled 默认开启
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = NacosRibbonClientConfiguration.class)
public class RibbonNacosAutoConfiguration {
}
NacosServiceRegistryAutoConfiguration:
@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
// nacos服务注册类,核心注册方法:NacosServiceRegistry#register【重要】
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
// 该类中有一个初始化init()方法,可做一些定制的处理
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
}
// nacos服务自动注册【重要】
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
}
}
// new NacosAutoServiceRegistration()
public NacosAutoServiceRegistration(ServiceRegistry serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
// 父类AbstractAutoServiceRegistration【父类实现了ApplicationListener接口】
// 并监听了WebServerInitializedEvent事件
// 则tomcat启动成功之后,springboot会发送WebServerInitializedEvent事件
// 然后AbstractAutoServiceRegistration监听该事件,触发onApplicationEvent方法
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
AbstractAutoServiceRegistration#onApplicationEvent
public void onApplicationEvent(WebServerInitializedEvent event) {
// 监听到WebServerInitializedEvent事件调用bind方法
bind(event);
}
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
// 开始注册
this.start();
}
AbstractAutoServiceRegistration#start
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
// 发送【预注册】事件
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
// 调用NacosServiceRegistry#register方法进行注册【重要】
register();
if (shouldRegisterManagement()) {
registerManagement();
}
// 发送注册完毕事件
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
NacosServiceRegistry#register
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
// 获取NamingService
NamingService namingService = namingService();
// 获取服务id
String serviceId = registration.getServiceId();
// 获取分组名称
String group = nacosDiscoveryProperties.getGroup();
// 封装实例信息Instance(ip、port、cluster等等)
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 调用namingService.registerInstance方法进行注册【重要】
namingService.registerInstance(serviceId, group, instance);
// 注册成功之后会有日志输出
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
} catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
NacosNamingService#registerInstance
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断实例是否是临时的,默认是临时的,所以会走到该方法
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 添加心跳上报线程【重要】,我们后面再看心跳机制的流程
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 又调用serverProxy.registerService方法进行注册
serverProxy.registerService(groupedServiceName, groupName, instance);
}
NamingProxy#registerService
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance);
final Map params = new HashMap(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 服务端注册接口名称、所需参数等可参考上图
// UtilAndComs.nacosUrlInstance = "/nacos//v1/ns/instance";
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
NamingProxy#reqApi
// 该方法就不详细看了,最终向服务端发送HttpClientRequest
public String reqApi(String api, Map params, Map body, List servers, String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg());
throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
2、心跳机制
AP模式下,发起http请求给服务端,维护服务的状态,默认5秒执行一次(CP模式是服务端主动检查)
NacosNamingService#registerInstance
注册服务前,会添加心跳上报线程
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断实例是否是临时的,默认是临时的,所以会走到该方法
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 添加心跳上报线程【重要】
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}
BeatReactor#addBeatInfo
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 创建BeatTask心跳线程【客户端默认5秒发送一次心跳】
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
// 本质还是调用上面我们看的注册服务方法
serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
3、服务发现
通过http获取需要的服务实例的列表,并且定时拉取最新数据,默认1秒执行一次;
通过udp协议,服务端推送变动消息来更新服务实例列表;
服务发现本质是基于Ribbon的服务发现的:DynamicServerListLoadBalancer
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList serverList, ServerListFilter filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
// 初始化
restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
// 开启初始化同步服务列表线程
enableAndInitLearnNewServersFeature();
// 更新服务列表
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections().primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
enableAndInitLearnNewServersFeature()
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
updateListOfServers()
public void updateListOfServers() {
List servers = new ArrayList();
if (serverListImpl != null) {
// 调 NacosServerList.getUpdatedListOfServers() 法获取服务列表
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
// 更新服务列表
updateAllServerList(servers);
}
protected void updateAllServerList(List ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
页面更新:2024-03-12
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号