sentinel流控效果是如何生效的

一、sentinel简介

Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量控制、流量路由、熔断降级、系统自适应保护等多个维度来帮助用户保障微服务的稳定性

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。伴随着公司用户量和流量的日益增加,对于数据库的压力是越来越大,Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护

二、源码入口

使用sentinel有两种方式,一种方式是对需要限流和降级的接口资源方法上面加入@SentinelResource注解,还有一种就是通过拦截器的方式进行资源保护限流和降级,其实两种方法执行的关键方法都是同一段代码,今天我们暂时通过注解这种aop切面的形式,sentinel源码是如何实现限流和降级的呢

底层源码的工作还是基于SpringBoot的自动装配原理,在spring-cloud-starter-alibaba-sentinel.jar,下面的spring.fatories里面的SentinelAutoConfiguration类,我们来看看这个类的源码


在Spring容器启动的时候实例化了一个SentinelResourceAspect类,看这个类的命名就应该可以大概猜到这个类就是一个切面类,我们点击进入

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        Method originMethod = resolveMethod(pjp);

        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            // Should not go through here.
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();
        Entry entry = null;
        try {
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            return pjp.proceed();
        } catch (BlockException ex) {
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // The ignore list will be checked first.
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                return handleFallback(pjp, annotation, ex);
            }

            // No fallback function can handle the exception, so throw it out.
            throw ex;
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}

一看就知道这个切面类,是利用的aop的技术,切点就是我们上面说的@SentinelResource注解,所以当我们的接口方法上面加了@SentinelResource注解,执行这个接口里面的业务逻辑前会先进入这个切面类的invokeResourceWithSentinel方法,我们看看这个切面是如何执行的,我们重点看SphU.entry(resourceName, resourceType, entryType, pjp.getArgs())这个方法


public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
    throws BlockException {
    return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}

在这个Env的类的实例化的时候会执行static代码块的逻辑

public class Env {

    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        InitExecutor.doInit();
    }

}

我们继续看看这个InitExecutor.doInit()方法里面到底做了什么动作

public static void doInit() {
    if (!initialized.compareAndSet(false, true)) {
        return;
    }
    try {
        List initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
        List initList = new ArrayList();
        for (InitFunc initFunc : initFuncs) {
            RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
            insertSorted(initList, initFunc);
        }
        for (OrderWrapper w : initList) {
            w.func.init();
            RecordLog.info("[InitExecutor] Executing {} with order {}",
                w.func.getClass().getCanonicalName(), w.order);
        }
    } catch (Exception ex) {
        RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
        ex.printStackTrace();
    } catch (Error error) {
        RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
        error.printStackTrace();
    }
}

首先用cas做了并发安全处理,保证多线程情况下面只有一个线程执行成功,然后这里主要的逻辑是 SpiLoader.of(InitFunc.class).loadInstanceListSorted(),然后会调用到里面的load方法

/**
 * Load all Provider instances of the specified Service, sorted by order value in class's {@link Spi} annotation
 *
 * @return Sorted Provider instances list
 */
public List loadInstanceListSorted() {
    load();

    return createInstanceList(sortedClassList);
}

    /**
     * Load the Provider class from Provider configuration file
     */
    public void load() {
        if (!loaded.compareAndSet(false, true)) {
            return;
        }

        String fullFileName = SPI_FILE_PREFIX + service.getName();
        ClassLoader classLoader;
        if (SentinelConfig.shouldUseContextClassloader()) {
            classLoader = Thread.currentThread().getContextClassLoader();
        } else {
            classLoader = service.getClassLoader();
        }
        if (classLoader == null) {
            classLoader = ClassLoader.getSystemClassLoader();
        }
        Enumeration urls = null;
        try {
            urls = classLoader.getResources(fullFileName);
        } catch (IOException e) {
            fail("Error locating SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader, e);
        }

        if (urls == null || !urls.hasMoreElements()) {
            RecordLog.warn("No SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader);
            return;
        }

        while (urls.hasMoreElements()) {
            URL url = urls.nextElement();

            InputStream in = null;
            BufferedReader br = null;
            try {
                in = url.openStream();
                br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
                String line;
                while ((line = br.readLine()) != null) {
                    if (StringUtil.isBlank(line)) {
                        // Skip blank line
                        continue;
                    }

                    line = line.trim();
                    int commentIndex = line.indexOf("#");
                    if (commentIndex == 0) {
                        // Skip comment line
                        continue;
                    }

                    if (commentIndex > 0) {
                        line = line.substring(0, commentIndex);
                    }
                    line = line.trim();

                    Class clazz = null;
                    try {
                        clazz = (Class) Class.forName(line, false, classLoader);
                    } catch (ClassNotFoundException e) {
                        fail("class " + line + " not found", e);
                    }

                    if (!service.isAssignableFrom(clazz)) {
                        fail("class " + clazz.getName() + "is not subtype of " + service.getName() + ",SPI configuration file=" + fullFileName);
                    }

                    classList.add(clazz);
                    Spi spi = clazz.getAnnotation(Spi.class);
                    String aliasName = spi == null || "".equals(spi.value()) ? clazz.getName() : spi.value();
                    if (classMap.containsKey(aliasName)) {
                        Class<? extends S> existClass = classMap.get(aliasName);
                        fail("Found repeat alias name for " + clazz.getName() + " and "
                                + existClass.getName() + ",SPI configuration file=" + fullFileName);
                    }
                    classMap.put(aliasName, clazz);

                    if (spi != null && spi.isDefault()) {
                        if (defaultClass != null) {
                            fail("Found more than one default Provider, SPI configuration file=" + fullFileName);
                        }
                        defaultClass = clazz;
                    }

                    RecordLog.info("[SpiLoader] Found SPI implementation for SPI {}, provider={}, aliasName={}"
                            + ", isSingleton={}, isDefault={}, order={}",
                        service.getName(), line, aliasName
                            , spi == null ? true : spi.isSingleton()
                            , spi == null ? false : spi.isDefault()
                            , spi == null ? 0 : spi.order());
                }
            } catch (IOException e) {
                fail("error reading SPI configuration file", e);
            } finally {
                closeResources(in, br);
            }
        }

        sortedClassList.addAll(classList);
        Collections.sort(sortedClassList, new Comparator>() {
            @Override
            public int compare(Class<? extends S> o1, Class<? extends S> o2) {
                Spi spi1 = o1.getAnnotation(Spi.class);
                int order1 = spi1 == null ? 0 : spi1.order();

                Spi spi2 = o2.getAnnotation(Spi.class);
                int order2 = spi2 == null ? 0 : spi2.order();

                return Integer.compare(order1, order2);
            }
        });
    }

在这个load方法里面这里会加载到sentinel-core.jar下面META-INF目录下面的services里面的所有的实现类,通过反射实例化



这里我们先返回进入到这个entryWithType方法里面去

@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
    throws BlockException {
    return entryWithType(name, resourceType, entryType, count, false, args);
}

  @Override
    public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                               Object[] args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
        return entryWithPriority(resource, count, prioritized, args);
    }

  private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        ProcessorSlot chain = lookProcessChain(resourceWrapper);

        /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

这里最后会调用到entryWithPriority这个方法里面,这里面用到了责任链的设计模式,首先会构造一个处理的链路,我们关注一下这个lookProcessChain(resourceWrapper)方法


ProcessorSlot lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                chain = SlotChainProvider.newSlotChain();
                Map newMap = new HashMap(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

首先会从一个缓存map里面去取,如果为空的话, 调用SlotChainProvider.newSlotChain()构造一个链路,并且放到map里面缓存

public static ProcessorSlotChain newSlotChain() {
    if (slotChainBuilder != null) {
        return slotChainBuilder.build();
    }

    // Resolve the slot chain builder SPI.
    slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

    if (slotChainBuilder == null) {
        // Should not go through here.
        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
        slotChainBuilder = new DefaultSlotChainBuilder();
    } else {
        RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
            slotChainBuilder.getClass().getCanonicalName());
    }
    return slotChainBuilder.build();
}

这里也是用到jdk的spi机制,会加载sentinel-core.jar里面META-INF下面的services下面配置的ProcessSlot的实现类

# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

最终通过DefaultSlotChainBuilder的build方法构造了一个处理器责任链,如下图


然后依次调用 chain.entry(context, resourceWrapper, null, count, prioritized, args)方法,依次调用图中的各个slot类,这里我们重点看看FlowSlot类和DegradeSlot这两个类的entry方法

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

在这个FlowSlot的entry方法,首先会校验我们控制台配置的限流流控规则,最终会调用到checkFlow方法

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
    throws BlockException {
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

public void checkFlow(Function> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        Collection rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

通过Collection rules = ruleProvider.apply(resource.getName())拿到我们在sentinel控制台配置的各种限流规则,然后遍历进行校验,不通过就抛出FlowException异常,这个时候整个这个责任链的执行流程就结束了,但是在之前执行的StatisticSlot的entry方法里面用try catch包裹住了,在这个SentinelResource Aspect里面会捕获BlocakExption异常,进行处理

@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
    Method originMethod = resolveMethod(pjp);

    SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
    if (annotation == null) {
        // Should not go through here.
        throw new IllegalStateException("Wrong state for SentinelResource annotation");
    }
    String resourceName = getResourceName(annotation.value(), originMethod);
    EntryType entryType = annotation.entryType();
    int resourceType = annotation.resourceType();
    Entry entry = null;
    try {
        entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
        return pjp.proceed();
    } catch (BlockException ex) {
        return handleBlockException(pjp, annotation, ex);
    } catch (Throwable ex) {
        Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
        // The ignore list will be checked first.
        if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
            throw ex;
        }
        if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
            traceException(ex);
            return handleFallback(pjp, annotation, ex);
        }

        // No fallback function can handle the exception, so throw it out.
        throw ex;
    } finally {
        if (entry != null) {
            entry.exit(1, pjp.getArgs());
        }
    }
}
protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
    throws Throwable {

    // Execute block handler if configured.
    Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
        annotation.blockHandlerClass());
    if (blockHandlerMethod != null) {
        Object[] originArgs = pjp.getArgs();
        // Construct args.
        Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
        args[args.length - 1] = ex;
        return invoke(pjp, blockHandlerMethod, args);
    }

    // If no block handler is present, then go to fallback.
    return handleFallback(pjp, annotation, ex);
}

最终会拿到这个SentinelResource注解里面配置的异常处理方法完成调用,至此整个sentinel流控保护流程就结束了,至于在这个责任链里面是如何统计各项指标,如何判断qps是否超过配置的最大值,以及sentinel里面的熔断,断路器何时进行打开,何时进行半开,何时关闭,里面的算法很复杂,以后专门写一篇文章来分析

页面更新:2024-03-07

标签:切面   切入点   注解   源码   实例   流量   逻辑   异常   效果   方法   责任

1 2 3 4 5

上滑加载更多 ↓
Top