标准 专业
多元 极客

Spring Cloud Alibaba研究院(6)——Sentinel——ProcessorSlotChain处理

上文中提到,我们请求的resource,会通过ProcessorSlotChain上的slot进行处理,接下来我们将会对核心的slot进行解析。

NodeSelectorSlot

NodeSelectorSlot用于构建调用链路,调用链路是流量控制权限控制降级控制的基础架构。

我们通过类注释的例子进行讲解:

ContextUtil.enter("entrance1", "appA");
Entry nodeA = SphU.entry("nodeA");
if (nodeA != null) {
    nodeA.exit();
}
ContextUtil.exit();
  1. 首先,创建了一个名称为entrance1的Context,并制定了调用发起者为appA。
  2. 其次,通过Sphu上报了了一次请求名称为nodeA的resource。
  3. 然后,成功获取resource,则释放resource。
  4. 最后,退出Context。

这段逻辑会在代码中生成如下的调用链路:

接下来会对这个调用链路进行扩展:

ContextUtil.enter("entrance1", "appA");
Entry nodeA = SphU.entry("nodeA");
if (nodeA != null) {
    nodeA.exit();
}
ContextUtil.exit();

ContextUtil.enter("entrance2", "appA");
nodeA = SphU.entry("nodeA");
if (nodeA != null) {
    nodeA.exit();
}
ContextUtil.exit();

第一部分代码没有任何变化。

第二部分代码重新创建了一个名称为entrance2的Context,获取的也是名称为nodeA的resource,随后也退出了,那么这段代码生成的调用链路是:

也就是说在请求同一个resource时,可以产生多个entranceNode和DefaultNode,但只会共用同一个ClusterNode。

首先,NodeSelectorSlot私有了一个map集合,用于存储每个调用Context的DefaultNode。

NodoeSelectorSlot#map()

一个思维环绕,Sentinel使用了Context名称代替resource id作为缓存的key,因为Sentinel需要做的是在不同的Context中共享同一个ProcessorSlotChain,所以它采用了缓存Context的方式,使用DefaultNode关联resource。

如果一个resource对应了多个DefaultNode,也就是对应了多个Context,那么如何对请求进行数据统计呢?

每个resource的DefaultNode会共享同一个ClusterNode,在接下来的ClusterBuilderSlot中会给你答案。

NodeSelectorSlot#entry()

我们来看NodeSelectorSlot#entry()方法:

public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
    // 获取当前上下文的DefaultNode
    DefaultNode node = map.get(context.getName());
    if (node == null) {
        // 如果不存在当前上下文的数据统计节点,此时就需要为当前上下文创建一个数据统计节点
        synchronized (this) {
            node = map.get(context.getName());
            if (node == null) {
                node = new DefaultNode(resourceWrapper, null);
                HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                cacheMap.putAll(map);
                cacheMap.put(context.getName(), node);
                map = cacheMap;
            }
            // 构建调用树
            ((DefaultNode)context.getLastNode()).addChild(node);
        }
    }
    // 设置处理节点为当前节点
    context.setCurNode(node);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
  1. 首先,NodeSelectorSlot缓存了context->defaultNode,我们可以根据context name获取已经调用的信息。
  2. 其次,会使用双重校验锁的同步方式创建新的context->defaultNode
  3. 然后,构建调用链路。
  4. 最后,将处理节点设置为当前获取到的DefaultNode,进行下一个slot的业务逻辑。

ClusterBuilderSlot

ClusterBuilderSlot持有了resource运行时的所有数据,在链路后端的具体执行操作时,都会从ClusterNode中获取实时数据,这些数据包括响应时间QPS线程数量异常等信息。

ClusterBuilderSlot#clusterNodeMap

ClusterBuilderSlot也私有了一个缓存map,用于存储resource->clusterNode映射关系。

随着应用运行的时间越长,clusterNodeMap就会变得越稳定,并且使用volatile+乐观锁的形式代替ConcurrentHashMap来保证线程安全。

ClusterBuilderSlot#entry()

我们来看调用链路中ClusterBuilderSlot的实现:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                    boolean prioritized, Object... args)
        throws Throwable {
    if (clusterNode == null) {
        synchronized (lock) {
            // 使用同步的方式创建cluster node
            if (clusterNode == null) {
                clusterNode = new ClusterNode();
                HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                newMap.putAll(clusterNodeMap);
                newMap.put(node.getId(), clusterNode);
                clusterNodeMap = newMap;
            }
        }
    }
    node.setClusterNode(clusterNode);

    // 如果上下文中设置了origin信息,需要为此创建一个节点
    if (!"".equals(context.getOrigin())) {
        Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
        context.getCurEntry().setOriginNode(originNode);
    }

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
  1. 首先,通过同步校验锁的方式初始化ClusterNode
  2. 其次,将ClusterNode和当前请求的resource关联的ClusterNode进行关联。
  3. 然后,如果请求中带有了origin信息,我们也需要为origin创建一个节点,放入到Context中。
  4. 最后,执行下一个slot的逻辑。

StatisticSlot

StatisticSlot致力于实时数据统计,当进行到StatisticSlot时,Sentinel需要进行ClusterNode的全部数据统计以及DefaultNode针对特定上下文、特定resource的数据统计。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                    boolean prioritized, Object... args) throws Throwable {
    try {
        // 继续进行责任链的后续Slot的处理
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
        // 后续处理完成之后,回到这里,进行数据统计
        node.increaseThreadNum();
        node.addPassRequest(count);

        if (context.getCurEntry().getOriginNode() != null) {
            // 更新origin node的数据
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }

        if (resourceWrapper.getType() == EntryType.IN) {
            // 更新inbound类型请求的全局数据
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        // 处理注册entry回调任务的通过事件
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        // 如果抛出因为优先级问题导致的等待问题,更新节点的并发请求数据
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {
            // 更新origin node的数据
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        if (resourceWrapper.getType() == EntryType.IN) {
            // 更新inbound类型请求的全局数据
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        // 处理注册entry回调任务的通过事件
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException e) {
        // 阻塞异常
        // 设置当前异常
        context.getCurEntry().setError(e);

        // 更新阻塞数据统计
        node.increaseBlockQps(count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseBlockQps(count);
        }

        if (resourceWrapper.getType() == EntryType.IN) {
            // 更新inbound类型请求的全局数据
            Constants.ENTRY_NODE.increaseBlockQps(count);
        }

        // 处理注册entry回调任务的通过事件
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onBlocked(e, context, resourceWrapper, node, count, args);
        }

        throw e;
    } catch (Throwable e) {
        // 未知的错误
        context.getCurEntry().setError(e);

        // 更新异常QPS数量
        node.increaseExceptionQps(count);
        // 更新origin node的数据
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseExceptionQps(count);
        }

        if (resourceWrapper.getType() == EntryType.IN) {
            // 更新inbound类型请求的全局数据
            Constants.ENTRY_NODE.increaseExceptionQps(count);
        }
        throw e;
    }
}

它其实是一个后置统计过程,相当于我们的请求已经成功通过网关的校验,可以正常执行,我们需要对计数器等进行操作。

  1. 首先,它会继续后续的slot处理,等待正常的返回结果。
  2. 如果正常返回,先更新当前node的请求数量、请求通过数,然后更新请求OriginNode的请求线程数和请求通过数,如果是inbound类型的请求,还会更新全局的请求数据,最后处理获取entry成功的回调任务。
  3. 如果开启了优先级策略,Sentinel判定可以进行等待,更新请求线程数即可。
  4. 如果捕获到了BlockException,证明当前请求被Sentinel拒绝,我们需要更新异常数据。
  5. 未知错误的情况下,我们也需要更新异常数据。
  6. 在BlockException和未知异常的情况下,都需要抛出异常,供CtSph进行处理。

SystemSlot

SystemSlot用于校验当前系统数据指标是否超出阈值。

SystemSlot#entry()

我们继续来看SystemSlot#entry()的执行逻辑:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                    boolean prioritized, Object... args) throws Throwable {
    SystemRuleManager.checkSystem(resourceWrapper);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
  1. 通过系统规则校验器校验系统数据指标阈值。
  2. 执行下一个slot的逻辑。

SystemRuleManager#checkSystem

public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
    // 确认是否开启系统规则校验,默认关闭
    if (!checkSystemStatus.get()) {
        return;
    }

    // 系统规则仅用于inbound流量
    if (resourceWrapper.getType() != EntryType.IN) {
        return;
    }

    // 校验当前QPS是否超出阈值
    double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
    if (currentQps > qps) {
        throw new SystemBlockException(resourceWrapper.getName(), "qps");
    }

    // 校验当前请求线程数是否超出阈值
    int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
    if (currentThread > maxThread) {
        throw new SystemBlockException(resourceWrapper.getName(), "thread");
    }
    // 校验当前平均响应时间是否超出阈值
    double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
    if (rt > maxRt) {
        throw new SystemBlockException(resourceWrapper.getName(), "rt");
    }

    // 校验当前系统负载是否超出阈值
    if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
        if (!checkBbr(currentThread)) {
            throw new SystemBlockException(resourceWrapper.getName(), "load");
        }
    }

    // 校验当前CPU使用率是否超出阈值
    if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
        if (!checkBbr(currentThread)) {
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }
}

SystemCheckManager#checkSystem()用于校验inbound流量对系统的压力是否超出阈值。

它会对当前对系统请求QPS、请求线程数、整体平均响应时间、系统负载、CPU使用率进行检查。

系统检查规则默认是关闭的。

接下来进入具体的网关规则检查。

AuthoritySlot

AuthoritySlot就是我们经常使用到的黑白名单模式,所以它会非常依赖originName,也就是我们的请求来源应用名称。

AuthoritySlot#entry()

我们继续来看AuthoritySlot#entry()的执行逻辑:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
    throws Throwable {
    checkBlackWhiteAuthority(resourceWrapper, context);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
  1. 进行黑白名单权限校验。
  2. 执行下一个slot的逻辑。

AuthoritySlot#checkBlackWhiteAuthority()

我们继续来看具体的黑白名单校验逻辑:

void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
    // 获取所有AuthorityRule
    Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

    if (authorityRules == null) {
        return;
    }

    // 获取指定resource的AuthorityRule集合
    Set<AuthorityRule> rules = authorityRules.get(resource.getName());
    if (rules == null) {
        return;
    }

    for (AuthorityRule rule : rules) {
        // 对于所有的AuthorityRule,进行规则检查
        if (!AuthorityRuleChecker.passCheck(rule, context)) {
            throw new AuthorityException(context.getOrigin(), rule);
        }
    }
}
  1. 首先,获取系统缓存的所有的权限规则。
  2. 接着,获取针对当前resource的所有权限规则。
  3. 然后,对当前resource的所有权限规则进行遍历,通过权限规则校验器进行检查。
  4. 如果命中了权限规则范围,则抛出AuthorityException异常。

AuthorityRuleChecker#passCheck()

我们继续来看针对每个权限规则的具体校验逻辑:

static boolean passCheck(AuthorityRule rule, Context context) {
    // 获取当前请求的origin名称
    String requester = context.getOrigin();

    // origin名称为空或者没有限制应用,直接判断通过权限规则校验
    if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
        return true;
    }

    // 进行名称全匹配搜索
    int pos = rule.getLimitApp().indexOf(requester);
    boolean contain = pos > -1;

    // 如果包含,就会进行精确匹配搜索
    if (contain) {
        boolean exactlyMatch = false;
        String[] appArray = rule.getLimitApp().split(",");
        for (String app : appArray) {
            if (requester.equals(app)) {
                exactlyMatch = true;
                break;
            }
        }

        contain = exactlyMatch;
    }
    // 获取权限策略方式
    int strategy = rule.getStrategy();
    // 如果策略是黑名单方式,并且包含当前请求的origin,权限规则校验不通过,返回false
    if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
        return false;
    }
    // 如果策略是白名单方式,并且不包含当前请求的origin,权限规则校验不通过,返回false
    if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
        return false;
    }
    // 其他情况下,通过权限规则校验,返回true
    return true;
}
  1. 首先,想要使用权限规则,我们的请求就必须带有请求的应用名称。
  2. 其次,会进行一次全局的索引搜索,如果存在需要进行限制的应用名称,才会分离所有的应用名称,进行精确匹配。
  3. 然后,根据权限规则的策略方式,对黑、白两种方式进行不同的校验。
  4. 如果策略是黑名单方式,并且包含当前请求的应用名称,则也判定权限校验规则不通过。
  5. 如果策略是白名单方式,并且不包含当前请求的应用名称,则判定权限规则校验不通过。
  6. 其他情况下,均判定为通过权限规则校验。

先进行全局搜索的原因在于避免每次都进行for循环操作,降低时间复杂度。

FlowSlot

FlowSlot#entry()

我们继续来看FlowSlot#entry()的执行逻辑:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                    boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

  1. 进行流控规则校验。
  2. 执行下一个slot的逻辑。

FlowRuleChecker#checkFlow()

我们继续看流控规则的校验逻辑:

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                        Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    // 获取当前resource的所有流控规则集合
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        // 遍历并校验
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}
  1. 获取当前resource关联的流控规则。
  2. 遍历关联的每个流控规则,并进行校验。

继续来看#canPassCheck()方法:

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }

    // 开启集群模式,进行集群模式校验
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    // 进行本地流控校验
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
  1. 对规则限制的应用进行校验,默认的流控规则的应用名称是default,意为对所有的应用都进行规则限制。
  2. 如果开启了集群模式,将使用集群规则校验模式进行规则校验。
  3. 否则,使用本地校验模式进行规则校验。

FlowRuleChecker#passClusterCheck()

我们继续来看集群模式的校验逻辑:

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                        boolean prioritized) {
    try {
        TokenService clusterService = pickClusterService();
        // 如果没有集群服务,降级到校验本地流控校验
        if (clusterService == null) {
            return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
        }
        // 获取flowId
        long flowId = rule.getClusterConfig().getFlowId();
        // 请求服务端,进行流控校验
        TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
        return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
    } catch (Throwable ex) {
        RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
    }
    // 降级到本地流控校验,如果规则的服务端或者客户端不可用
    // 如果降级服务也不可用,直接判定为通过
    return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
  1. 由于是集群模式,我们需要向Token Server节点请求token,请求token首先就需要获取发送请求的client。
  2. 如果没有可用的client,Sentinel就会降级到本地规则校验逻辑。
  3. 构建token请求body,并发送请求。
  4. 处理请求结果。

FlowRuleChecker#fallbackToLocalOrPass()

我们j继续来看集群模式降级为本地模式的逻辑:

private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                boolean prioritized) {
    // 集群开启了如果请求失败,就降级到本地流控规则校验策略
    if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
        // 本地校验流控规则
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    } else {
        // 至此,流控规则不会生效,直接通过
        return true;
    }
}
  1. 集群模式降级策略通过fallbackToLocalWhenFail属性进行控制,如果开启降级本地策略,则会调用FlowRuleChecker#passLocalCheck()进行处理。
  2. 没有开启集群降级至本地策略,直接判定为通过。

FlowRuleChecker#passLocalCheck()

我们继续来看本地流控规则校验逻辑:

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, prioritized) {
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }

    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
  1. 首先,需要选择流控数据来源数据统计节点。
  2. 然后,以数据统计节点的数据为基准,以获取的token数量为需求,进行流控规则的校验。

FlowRuleChecker#selectNodeByRequesterAndStrategy()

我们继续来看数据节点的选取逻辑,通过上一节的介绍,我们了解到,DefaultNode和ClusterNode都可以进行数据统计,而我们还可以设置不同的流控策略:

static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
    // 必须参数校验
    String limitApp = rule.getLimitApp();
    int strategy = rule.getStrategy();
    String origin = context.getOrigin();
    // 如果origin名称指定了除"default"或"other"之外的origin名称
    if (limitApp.equals(origin) && filterOrigin(origin)) {
        // 并且策略是直接流控控制
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // 直接返回当前上下文正在处理的数据统计节点
            return context.getOriginNode();
        }
        // 否则,使用关联的数据统计节点
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
        // 如果origin名称范围是"default"
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // 并且策略是直接流控控制,直接返回ClusterNode作为数据统计节点
            return node.getClusterNode();
        }
        // 否则,使用关联的数据统计节点
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
        && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
        // 并且策略是直接流控控制,直接返回当前上下文正在处理的数据统计节点
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            return context.getOriginNode();
        }
        // 否则,使用关联的数据统计节点
        return selectReferenceNode(rule, context, node);
    }
    // 策略校验没有通过,没有获取到数据统计节点
    return null;
}
  1. 首先,对origin名称、流控策略、流控限制origin范围进行校验。
  2. 如果流控规则对自定义origin进行了限制:
    1. 如果流控策略是直接流控控制,则返回当前上下文使用的数据统计节点。
    2. 否则,使用关联的数据统计节点。
  3. 如果流控规则是默认的针对所有的请求来源:
    1. 如果流控策略是直接流控控制,则使用ClusterNode作为数据统计节点。
    2. 否则,使用关联的数据统计节点。
  4. 如果流控规则是针对其他请求来源,并且当前请求属于其他请求来源的指定范围内。
    1. 如果策略是直接流控控制,则返回当前上下文正在处理的数据统计节点。
    2. 否则,使用关联的数据统计节点
  5. 至此,策略校验没有通过,没有获取到数据统计节点。

没有数据统计节点,Sentinel不会进行流控。

FlowRuleChecker#selectReferenceNode()

我们继续来看如何选择关联的数据统计节点:

static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
    // 获取当前resource关联的resource
    String refResource = rule.getRefResource();
    // 获取当前resource规则的流控策略
    int strategy = rule.getStrategy();
    // 如果没有关联resource,视为没有数据统计节点
    if (StringUtil.isEmpty(refResource)) {
        return null;
    }
    // 如果流控策略是关联流控策略,则使用关联resource的ClusterNode作为数据统计节点
    if (strategy == RuleConstant.STRATEGY_RELATE) {
        return ClusterBuilderSlot.getClusterNode(refResource);
    }
    // 如果流控策略是链式流控策略
    if (strategy == RuleConstant.STRATEGY_CHAIN) {
        // 如果链式调用并不在同一个Context中,则无法获取数据统计节点
        if (!refResource.equals(context.getName())) {
            return null;
        }
        // 否则使用当前Context使用的数据统计节点
        return node;
    }
    // 没有找到合法的节点,返回null
    return null;
}
  1. 首先,此时仅会存在两种策略,关联流控策略链式流控策略,这两种策略都与refResource属性,也就是关联的resource有关,所以需要对refResource属性进行校验。
  2. 如果流控策略是关联流控策略,则使用关联resource的ClusterNode作为数据统计节点。
  3. 如果流控策略是链式流控策略:
    1. 如果链式调用并不在同一个Context中,则无法获取数据统计节点。
    2. 否则使用当前Context使用的数据统计节点。
  4. 至此,没有找到合法的数据统计节点。

获取到了数据统计节点,接下来就会针对不同的流量整形策略,进行不同的流控处理逻辑。

流量整形处理逻辑

Sentinel中流量整形通过TrafficShapingController来实现,

DefaultController

我们来看直接拒绝策略的流控规则校验:

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    // 当前已经被获取的token数量
    int curCount = avgUsedTokens(node);
    // 已经被获取的token数量+本次请求即将获取的数量超出了流控规则设置的阈值
    if (curCount + acquireCount > count) {
        // 如果请求开启了优先级策略,并且流控等级为QPS限流
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
            long currentTime;
            long waitInMs;
            // 获取当前时间戳
            currentTime = TimeUtil.currentTimeMillis();
            // 获取排队等待时间
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            // 如果等待时间处于设置的可等待的时间范围内
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                // 进行等待
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);

                // PriorityWaitException意味着请求将会在等待{@link @waitInMs}时间之后判定通过
                throw new PriorityWaitException(waitInMs);
            }
        }
        // 否则判定不通过此流控规则
        return false;
    }
    return true;
}
  1. 首先,获取已经被获取的token数量。
  2. 如果当前已经被获取的token+本次请求需要获取的token数量,超出了流控规则设置的阈值:
    1. 在没有开启优先级策略的情况下,判定命中流控规则,进行阻塞。
    2. 在开启优先级策略的情况下,需要获取当前滑动窗口需要进行等待的时间,进行等待后,判定请求通过,抛出PriorityWaitException异常。
  3. 如果当前已经被获取的token+本次请求需要获取的token数量,没有超出了流控规则设置的阈值,则判定通过此流控规则。

WarmUpController

我们来看预热模式的流控规则校验:

WarmUpController#canPass()
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    long passQps = (long) node.passQps();

    long previousQps = (long) node.previousPassQps();
    syncToken(previousQps);

    // 开始计算它的斜率
    // 如果进入了警戒线,开始调整他的qps
    long restToken = storedTokens.get();
    if (restToken >= warningToken) {
        long aboveToken = restToken - warningToken;
        // 消耗的速度要比warning快,但是要比慢
        // current interval = restToken*slope+1/count
        // 通过计算当前可用token和警戒线的距离,来计算当前的QPS
        // aboveToken * slope + 1.0 / count = 处于预热区内,当前比率下,获取token的时间
        // 1/获取token的时间,即为当前的QPS
        double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
        if (passQps + acquireCount <= warningQps) {
            return true;
        }
    } else {
        // 如果处于热区域内,没有超过规则设定的阈值,则判定通过
        if (passQps + acquireCount <= count) {
            return true;
        }
    }
	  // 否则,判定不通过
    return false;
}
  1. 首先,获取当前QPS和上一秒的QPS,针对上一秒的QPS,更新可获取的token数量。
  2. 如果剩余的token数量进入了预热区,我们需要计算当前的预热QPS,如果当前QPS+本次需要获取的token数量没有过预热QPS,证明是在预热区内是可以接受本次请求的,判定通过。
  3. 如果处于热区域内,没有超过规则设定的阈值,也判定通过。
  4. 其他情况,均判定不通过。
WarmUpController#syncToken()

我们继续来看扩容token数量的逻辑:

protected void syncToken(long passQps) {
    // 获取当前的时间戳
    long currentTime = TimeUtil.currentTimeMillis();
    // 精确到秒的时间戳
    currentTime = currentTime - currentTime % 1000;
    // 获取上一次扩容的时间
    long oldLastFillTime = lastFilledTime.get();
    // 如果时间不对,不进行同步
    if (currentTime <= oldLastFillTime) {
        return;
    }
    // token数量旧值
    long oldValue = storedTokens.get();
    // 计算新的token数量
    long newValue = coolDownTokens(currentTime, passQps);
    // 更新token的值
    if (storedTokens.compareAndSet(oldValue, newValue)) {
        // 更新当前可用的值
        long currentValue = storedTokens.addAndGet(0 - passQps);
        if (currentValue < 0) {
            storedTokens.set(0L);
        }
        // 设置上一次扩容的时间戳
        lastFilledTime.set(currentTime);
    }
}
  1. 首先,token数量的添加,要以过去的秒时间单位进行统计。
  2. 其次,我们需要获取上一次进行扩容token的去掉毫秒位的时间戳已经当前去掉毫秒位的时间戳。
  3. 通过预热非预热模式来决定扩容的token数量。
  4. 更新剩余的token数量和更新时间戳。
WarmUpController#coolDownTokens()

我们继续来看获取扩充token数量的逻辑:

private long coolDownTokens(long currentTime, long passQps) {
    long oldValue = storedTokens.get();
    long newValue = oldValue;

    // 添加token的判断前提条件:
    // 当token的消耗程度远远低于警戒线的时候
    if (oldValue < warningToken) {
        // 处于长方形区域内,属于热区,正常添加token
        // 根据count数,每秒扩充token
        newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
    } else if (oldValue > warningToken) {
        // 当前判定仍处于冷启动阶段
        // 太绕了,我佛了
        // 当前最慢的QPS计算公式:1 / (coldFactor * (1 / count)) => count / coldFactor
        // 当前的QPS已经比最慢的还慢了,需要添加token
        if (passQps < (int)count / coldFactor) {
            // 否则就需要添加token
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        }
    }
    // 取计算出的token新值和容量上限的二者的最小值
    return Math.min(newValue, maxToken);
}
  1. 当token处于热区域内时,证明服务处于正常状态,按照已经过去的秒级时间正常扩充即可。
  2. 当token处于预热区域内时,首先我们需要计算出最慢的QPS,如果当前QPS已经比最慢的QPS还慢了,则需要添加token。
  3. token总数量不能超过maxToken的上限。

预热策略需要注意计算公式。

RateLimiterController

我们来看匀速排队策略的实现:

RateLimiterController#canPass()
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    // 如果请求的token是非正整数,直接通过
    if (acquireCount <= 0) {
        return true;
    }
    // 如果当前可获取的token数量是非正整数,判定不通过
    // 否则,costTime的计算将是最大的,并且在某些情况下,等待时间会溢出
    if (count <= 0) {
        return false;
    }
    // 获取当前的时间戳
    long currentTime = TimeUtil.currentTimeMillis();
    // 计算两次请求之间的时间差,把请求均匀分在1s上
    long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

    // 当前请求预期通过的时间戳
    long expectedTime = costTime + latestPassedTime.get();
    // 如果预期通过的时间不是未来的时间戳
    if (expectedTime <= currentTime) {
        // 更新上一次通过的时间,本次请求通过
        latestPassedTime.set(currentTime);
        return true;
    } else {
        // 如果期待的时间是在未来的时间戳
        // 计算等待时间
        long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
        // 如果等待时间大于等待时间,判定不通过
        if (waitTime > maxQueueingTimeMs) {
            return false;
        } else {
            // 上次时间+本次需要耗费的时间
            long oldTime = latestPassedTime.addAndGet(costTime);
            try {
                // 获取最新的等待时间
                waitTime = oldTime - TimeUtil.currentTimeMillis();
                // 再次判断下是否超过最大等待时间
                if (waitTime > maxQueueingTimeMs) {
                    // 需要阻塞,将上次通过的时间改回去
                    // 刚开始认为这是个bug,后来了解到
                    // 每个请求在设置oldTime时都会通过addAndGet()原子操作lastestPassedTime,并赋值给oldTime
                    // 这样每个线程的sleep()时间都不相同,线程也不会同时醒过来
                    // costTime+latestPassedTime,大于当前时间就代表请求的比较频繁,需要等待一会才能继续请求
                    latestPassedTime.addAndGet(-costTime);
                    return false;
                }
                // 在条件竞争中,等待时间很可能是非正整数,
                if (waitTime > 0) {
                    // 如果可以等待,则等待相应时间
                    Thread.sleep(waitTime);
                }
                // 如果等待时间是非负整数,无需等待,直接判定通过
                // 判定通过
                return true;
            } catch (InterruptedException e) {
            }
        }
    }
    // 其余情况,判定为不通过
    return false;
}
  1. 首先,对请求的token数量,当前可获取的token数量进行校验。
  2. 然后,计算两次请求之间的时间差,将请求的token数量均匀分布在1s上,计算出的请求时间差+上一次请求通过的时间就是本次请求预期通过的时间。
  3. 如果预期通过的时间,小于当前时间戳,证明剩余可获取的token满足本次请求获取token的数量,直接判定通过,否则就需要排队等待。
  4. 首先计算当前请求获取给定token数量还需要等待的时间,如果等待时间大于排队时间,则无法进行排队,直接判定不通过。
  5. 如果可以进行排队,重新判断一下是否进行排队,如果仍可进行排队,则当前请求占有了队列中的位置,在等待时间内进行等待后,判定通过。
  6. 如果不可以进行排队,则将最后一次通过请求的时间戳更新为上一次通过请求的时间戳,判定不通过。

这里有一个很巧妙的设计,每次请求在设置oldTime时,都会通过addAndGet()原子操作lastestPassedTime,这样每个线程的sleep()时间都不相同,线程大概率不会同时醒过来。

WarmUpRateLimiterController

WarmUpControllerRateLimiterController代码类似,可以自行查看。

我们默认可以通过http://localhost:8719/tree地址来查看每个资源的实时统计,也可以在Dashboard进行查看。

DegradeSlot

DegradeSlot用于对降级规则进行校验。

DegradeSlot#entry()

我们继续来看DegradeSlot#entry()的执行逻辑:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
    throws Throwable {
    // 校验降级规则
    DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
    // 进行下一个ProcessorSlot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
  1. 进行降级规则校验。
  2. 执行下一个slot的逻辑。

DegradeRuleManger#checkDegrade()

我们继续来降级规则的校验逻辑:

public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
    throws BlockException {
    // 获取当前resource的所有降级规则
    Set<DegradeRule> rules = degradeRules.get(resource.getName());
    if (rules == null) {
        return;
    }
    // 遍历每个降级规则,分别进行检查
    for (DegradeRule rule : rules) {
        if (!rule.passCheck(context, node, count)) {
            // 命中降级规则,抛出DegradeException
            throw new DegradeException(rule.getLimitApp(), rule);
        }
    }
}
  1. 首先,从缓存的全量降级集合中获取当前resource的降级规则。
  2. 其次,遍历每个降级规则,分别进行降级规则检查。
  3. 最后,如果命中了降级规则,抛出DegradeException。

DegradeRule#passCheck()

我们继续来看降级规则的对降级的处理逻辑:

@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
    // 获取乐观锁
    if (cut.get()) {
        return false;
    }
    // 获取ClusterNode,用于提供Cluster数据信息
    ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
    if (clusterNode == null) {
        return true;
    }

    if (grade == RuleConstant.DEGRADE_GRADE_RT) {
        // 响应时间降级策略
        // 从ClusterNode获取实时统计的成功请求的平均响应时间
        double rt = clusterNode.avgRt();
        // 如果计算出的平均响应时间小于阈值,判定通过
        if (rt < this.count) {
            passCount.set(0);
            return true;
        }

        // 否则一经超过阈值,将启动降级策略
        // 如果接下来的五次请求依然超出了阈值,才会走最后的降级策略
        if (passCount.incrementAndGet() < RT_MAX_EXCEED_N) {
            // 接下来的五次请求,如果没有超过阈值,那么依然会判定通过降级检查
            return true;
        }
    } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
        // 异常比率策略
        // 获取当前resource的发生异常QPS
        double exception = clusterNode.exceptionQps();
        // 获取当前resource的成功请求QPS
        double success = clusterNode.successQps();
        // 获取当前resource的总QPS
        double total = clusterNode.totalQps();
        // 如果总QPS小于阈值上线,判定通过降级规则检查
        if (total < RT_MAX_EXCEED_N) {
            return true;
        }
        // 真正成功的请求数=成功请求数-异常请求数
        double realSuccess = success - exception;
        // 如果没有真正成功的请求数,并且异常数量小于设定的阈值
        if (realSuccess <= 0 && exception < RT_MAX_EXCEED_N) {
            // 判定通过降级规则检查
            return true;
        }
        // 如果异常比率占成功比率,没有超过设定的阈值
        if (exception / success < count) {
            // 判定通过降级规则检查
            return true;
        }
    } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
        // 获取当前resource出现的异常数量,统计的是每分钟的数量
        double exception = clusterNode.totalException();
        // 如果resource每分钟的异常数量没有超出设定的阈值
        if (exception < count) {
            // 判定通过降级规则检查
            return true;
        }
    }
    // 总结一下,只有在以下情况会走到这里
    // 平均响应时间:计算的平均响应时间大于阈值,接下来的五次请求也已经超过缓冲的阈值
    // 异常比率:每秒钟出现的异常数量已经超过设定的阈值,有真正成功的请求,或者异常数量超过缓冲的异常数量
    // 异常数量:每分钟出现的异常数量已经超过阈值
    // 首先乐观锁设置为不可用
    if (cut.compareAndSet(false, true)) {
        // 需要进行重置
        ResetTask resetTask = new ResetTask(this);
        // 根据降级时间进行恢复
        pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
    }

    return false;
}
  1. 首先,降级数据信息来源只会来源于ClusterNode,因为降级只和resource全局的信息有关。
  2. 如果设置的降级策略是根据平均响应时长,我们会根据ClusterNode提供的平均响应时长进行判定:
    1. 在设定的阈值范围内,判定通过。
    2. 否则,在接下来的五次请求,如果没有超过设定的阈值,那么也判定通过。
    3. 其他情况下,判定不通过。
  3. 如果设置的降级策略是根据出现异常比率,我们会从ClusterNode中分别获取异常请求QPS成功请求QPS总QPS
    1. 如果总的QPS还没有超过设定的阈值,直接判定通过。
    2. 如果没有真正成功的请求,并且出现异常的请求数量没有超过Sentinel设定的阈值,判定通过。
    3. 前面均是针对异常请求QPS的限制,此时开始计算比率,如果异常QPS/成功QPS,小于设定的阈值,判定通过。
    4. 其他情况,均判定不通过。
  4. 如果设置的降级策略是根据异常个数,则统计每分钟出现的异常数量,没有超出设定的阈值,判定通过。
  5. 以上没有判定通过的情况,均会走到这里,也就是对当前窗口的数据进行重置:
    1. 平均响应时间:计算的平均响应时间大于阈值,接下来的五次请求也已经超过缓冲的阈值。
    2. 异常比率:每秒钟出现的异常数量已经超过设定的阈值,有真正成功的请求,或者异常数量超过缓冲的异常数量。
    3. 异常数量:每分钟出现的异常数量已经超过阈值。

    重置时,由于开启了乐观锁,所以所有请求都判定为不通过,服务将在降级恢复时间内重置完成后恢复。

异常比率降级策略中,异常QPS成功请求QPS总QPS,都是异常数量/窗口时间间隔,单位秒。

 

赞(3) 投币

评论 抢沙发

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫