标准 专业
多元 极客

Spring Cloud Alibaba研究院(5)——Sentinel——资源上报

在上一节中提到@SentinelResource注解,在注解中,是通过切面的形式进行上报,上报引擎是通过Env类型进行初始化的。

public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
    return Env.sph.entry(name, type, count, args);
}

Sph接口

Sph是一个用于进行resource保护的接口,如果触发了阻塞标准,会抛出BlockException异常,而它的返回值是一个Entry,如果请求获取到了这个Entry,证明允许调用通过。

Sph的实现类是CtSph,用于实现resource使用情况的所有上报逻辑,我们来看核心方法#entry()

@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
   // 由于传入的是资源名称,使用StringResourceWrapper
   StringResourceWrapper resource = new StringResourceWrapper(name, type);
   // 进行资源上报
   return entry(resource, count, args);
}

ResourceWrapper

ResourceWrapper是Sentinel对我们请求resource的内部通用包装类,用于判断两种resource是否相等,总共具有两个参数:

/**
 * resource名称
 */
protected String name;
/**
 * 请求类型
 */
protected EntryType type = EntryType.OUT;
  • name,名称,即为我们请求的资源名称。
  • type,请求的类型,总体分为两种,入口请求出口请求(可以理解为Inbound和Outbound)。

它的核心方法是#equals()

/**
 * 只考虑{@link #getName()}
 */
@Override
public boolean equals(Object obj) {
    if (obj instanceof ResourceWrapper) {
        ResourceWrapper rw = (ResourceWrapper)obj;
        return rw.getName().equals(getName());
    }
    return false;
}

比较的两个类型必须同是ResourceWrapper类型,但只比较两个请求资源的名称。

通过这个方法,可以实现通过资源名称、资源方法进行请求,使用的还是同一个资源。

ResourceWrapper目前为止共有两个实现类:

  • StringResourceWrapper:通用的ResourceWrapper,入参是resource name。
  • MethodResourceWrapper:方法调用的ResourceWrapper,入参是method。

在进入上报流程后,我们对资源的使用都会通过ResourceWrapper这个包装类进行

我们继续看CtSph#entry()方法。

CtSph#entry()

public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
   return entryWithPriority(resourceWrapper, count, false, args);
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
      throws BlockException {
   Context context = ContextUtil.getContext();
   // 如果当前线程使用的是NullContext,证明已经不会被规则控制
   if (context instanceof NullContext) {
      // 只实例化entry即可,无需校验规则
      return new CtEntry(resourceWrapper, null, context);
   }
   // 当前线程还没有上线文,使用默认的上下文
   if (context == null) {
      context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
   }

   // Sentinel的全局开关关闭,也不会进行规则校验
   if (!Constants.ON) {
      return new CtEntry(resourceWrapper, null, context);
   }
   // 此时已经证明了有Context
   // 创建ProcessorSlot
   ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

   // chain为null意味着已经超过slot chain的总数上限,不会进行规则校验
   // 创建Entry之后直接返回
   if (chain == null) {
      return new CtEntry(resourceWrapper, null, context);
   }

   Entry e = new CtEntry(resourceWrapper, chain, context);
   try {
      // 在链上申请,相当于过一遍Sentinel
      // 由于链上插满了可扩展的槽,比如流控,降级都是可扩展的槽,此方法相当于按照槽的顺序过滤了一遍
      chain.entry(context, resourceWrapper, null, count, prioritized, args);
   } catch (BlockException e1) {
      // 触发规则限制,抛出异常
      e.exit(count, args);
      throw e1;
   } catch (Throwable e1) {
      // Sentinel内部错误,比较少见,记录下来,用于反馈
      RecordLog.info("Sentinel unexpected exception", e1);
   }
   return e;
}
  1. 首先,需要创建当前外部请求的上下文。
  2. 其次,Sentinel具有默认开启功能的全局开关,校验此开关是否关闭。
  3. 然后,我们会根据上线文获取进行网关操作的责任链。
  4. 接着,我们会将当前请求的resource,依次通过责任链进行处理。
  5. 最后,我们会对不同的返回结果进行处理。

Context

Context存储了请求调用的上下文信息,包括上下文名称、调用树的根节点、当前正在处理的节点、当前上下文的Origin名称等信息。

所有的外部请求,都离不开上下文。

CtSph#lookProcessChain()

CtSph#lookProcessChain()方法用于获取resource的ProcessorSlotChain

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    // 从缓存中获取当前resource关联的ProcessorSlotChain
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    // 如果当前resource没有关联,同步下创建一个新的处理链
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            // 双重校验
            if (chain == null) {
                // entry大小限制,不能超过SLOT_CHAIN的阈值
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                // 使用SlotChainProvider创建一个新的slot chain
                chain = SlotChainProvider.newSlotChain();
                // 迁移并缓存新创建的slot chain
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

通过ResourceWrapper#equals()方法判断相等的resource会全局共用ProcessorSlotChain,并且与上下文无关。

  1. 首先,并不是每次请求都会重新创建一条处理链,针对resource,Sentinel会对其进行缓存。
  2. 其次,首次请求对应,会在双重校验锁的同步状态下,创建处理链。
  3. 接着,使用SlotChainProvider创建新的slot chain。
  4. 最后,迁移缓存集合。

由于每个resource的slot chain可以全局复用,所以Sentinel要求,每个节点上的处理链个数不能超过6000个。

SlotChainProvider#newSlotChain()

SlotChainProvider#newSlotChain()用于创建slot chain以及chain上的一系列配件。

public static ProcessorSlotChain newSlotChain() {
    // 可以使用建造器构建的情况下,使用建造器构造
    if (builder != null) {
        return builder.build();
    }
    // 没有建造器,会创建一个建造器
    // 使用自定义建造器或者默认建造器
    resolveSlotChainBuilder();
    // 如果仍为空,可能在加载builder中出现异常状态,使用DefaultSlotChainBuilder
    if (builder == null) {
        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
        builder = new DefaultSlotChainBuilder();
    }
    // 返回建造器构建的slot chain
    return builder.build();
}
  1. 首先,slot chain是通过建造者模式,以固定模型进行构建的。
  2. 其次,如果当前没有slot chain的建造器,就需要找到对应的建造器。
  3. 然后,如果在源码中没有找到任何建造器,则使用DefaultSlotChainBuilder建造器。
  4. 最后,使用计算出的建造器构建slot chain。

#resolveSlotChainBuilder()

SlotChainProvider#resolveSlotChainBuilder()用于加载指定的slot chain建造器,出现多个时,仅会添加一个,和指定的顺序有关。

private static void resolveSlotChainBuilder() {
    List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();
    boolean hasOther = false;
    // 遍历通过SPI加载的slot chain
    for (SlotChainBuilder builder : LOADER) {
        if (builder.getClass() != DefaultSlotChainBuilder.class) {
            hasOther = true;
            list.add(builder);
        }
    }
    // 获取自定义的DefaultSlotChainBuilder建造者
    if (hasOther) {
        builder = list.get(0);
    } else {
        // 没有自定义的builder,使用默认的DefaultSlotChainBuilder进行构造
        builder = new DefaultSlotChainBuilder();
    }

    RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
            + builder.getClass().getCanonicalName());
}
  1. 首先,我们可以通过SPI指定SlotChainBuilder接口的方式,自定义slot chain的建造器。
  2. 其次,Sentinel会根据ServiceLoader对SlotChainBuilder进行加载。
  3. 然后,仅会从多个自定义SlotChainBuilder中获取第一个建造器。
  4. 最后,如果没有找到任何自定义的SlotChainBuilder,会使用DefaultSlotChainBuilder作为slot chain的默认建造器。

DefaultSlotChainBuilder#build()

DefaultSlotChainBuilder#build()使用默认的建造器建造slot chain。

@Override
public ProcessorSlotChain build() {
    // 一个默认的{@link ProcessorSlotChain},相当于链
    // Slot相当于链上的槽,各种类型的Slot相当于插入槽的组件
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    chain.addLast(new NodeSelectorSlot());
    chain.addLast(new ClusterBuilderSlot());
    chain.addLast(new LogSlot());
    chain.addLast(new StatisticSlot());
    chain.addLast(new SystemSlot());
    chain.addLast(new AuthoritySlot());
    chain.addLast(new FlowSlot());
    chain.addLast(new DegradeSlot());
    return chain;
}

ProcessorSlotChain提供了需要进行处理的链,而责任链上的slot则相当于各种可插拔的槽,而NodeSelectorSlot、ClusterBuilderSlot等相当于热插拔的插件,提供实际的处理逻辑。

我们接着回到CtSph#entryWithPriority()方法。

Entry e = new CtEntry(resourceWrapper, chain, context);
try {
    // 在链上申请,相当于过一遍Sentinel
    // 由于链上插满了可扩展的槽,比如流控,降级都是可扩展的槽,此方法相当于按照槽的顺序过滤了一遍
    chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
    // 触发规则限制,抛出异常
    e.exit(count, args);
    throw e1;
} catch (Throwable e1) {
    // Sentinel内部错误,比较少见,记录下来,用于反馈
    RecordLog.info("Sentinel unexpected exception", e1);
}
return e;

在获取到对应resource的slot chain后,Sentinel会直接创建Entry,Entry包含了并发调用时的信息,当前处理的节点,后端服务节点,申请Entry时间等信息。

  1. chain.entry()相当于从责任链的header开始,进行处理。
  2. 如果正常通过责任链,则证明成功通过各种网关拦截,可以执行请求。
  3. 如果捕获到BlockException,证明在进行slot的处理过程中,被规则限制住,不能执行本次请求。
  4. 其他异常属于Sentinel的内部错误,比较少见,记录下来,同时判定通过本次责任链的处理。

ProcessorSlotChain

如果说Sph属于Sentinel的工具类,ProcessorSlotChain就属于Sentinel网关实现的核心。

/**
 * 一些存储过程的容器,以及完成过程时的通知方式
 */
public interface ProcessorSlot<T> {

    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
               Object... args) throws Throwable;

    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
                   Object... args) throws Throwable;

    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
  • #entry()#exit(),用于进入和退出当前Slot的处理。
  • #fireEntry()#fireExit(),用于开启和关闭下一个Slot,以便调用#entry()#exit()进行处理。

AbstractLinkedProcessorSlot

AbstractLinkedProcessorSlotProcessorSlot的抽闲实现类,用于定义处理链的运转:

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {

    private AbstractLinkedProcessorSlot<?> next = null;

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

    @SuppressWarnings("unchecked")
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
        throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        if (next != null) {
            next.exit(context, resourceWrapper, count, args);
        }
    }

    public AbstractLinkedProcessorSlot<?> getNext() {
        return next;
    }

    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        this.next = next;
    }

}

AbstractLinkedProcessorSlot私有了一个next指针,指向下一个需要处理的slot

那么这个next指针什么时候链起整个slot的呢?

我们在DefaultSlotChainBuilder#build()方法中链通过addLast()的方式添加的,DefaultProcessorSlotChain#addLast()

@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
    // end节点的下一个节点设为给定节点,同时将end节点设置为给定节点
    // 这么写是避免链断掉
    end.setNext(protocolProcessor);
    end = protocolProcessor;
}

每次写入都会将end节点的下一个节点设为给定的ProcessorSlot节点,同时将给定的ProcessorSlot更新为end节点。

AbstractLinkedProcessorSlot#fireEntry()

AbstractLinkedProcessorSlot#fireEntry()用于移动处理链上的指针,相当于:

ProcessorSlotChain#entry()

前文提到,ProcessorSlot#entry()用于请求处理链中的header slot,在创建DefaultProcessorSlotChain时,会默认创建一个空操作的header slot

AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
        super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        super.fireExit(context, resourceWrapper, count, args);
    }

};

可以理解为它是一个仅用于起始的header slot

紧接着,DefaultProcessorSlotChaint提供了如下的调用链:

赞(3) 投币

评论 抢沙发

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫