标准 专业
多元 极客

Zuul 2研究院(2)——请求处理原理分析

本篇文章主要对Zuul 2接收请求后的流程进行分析。

1 Filter执行

1.1 执行InboundFilter

我们在处理完所有的HttpInboundSyncFilter后,接下来还会进行一步操作:

private final void runFilters(final T mesg, final AtomicInteger runningFilterIdx) {
    T inMesg = mesg;
    String filterName = "-";
    try {
        Preconditions.checkNotNull(mesg, "Input message");
        int i = runningFilterIdx.get();
        // 执行所有的HttpInboundFilter
        while (i < filters.length) {
            final ZuulFilter<T, T> filter = filters[i];
            filterName = filter.filterName();
            final T outMesg = filter(filter, inMesg);
            if (outMesg == null) {
                return; 
            }
            inMesg = outMesg;
            i = runningFilterIdx.incrementAndGet();
        }

        // 进行下一步操作
        invokeNextStage(inMesg);
    } catch (Exception ex) {
        ...
    }
}
  1. 首先,会执行所有的HttpInboundSyncFilter,也就是我们之前看到过的ZuulFilter,相当于过滤器与拦截器的作用。
  2. 接下来会进入Zuul 2的下一层操作。

我们继续看BaseZuulFilterRunner#invokeNextStage()方法:

protected final void invokeNextStage(final O zuulMesg) {
    if (nextStage != null) {
        // 执行下一步操作
        nextStage.filter(zuulMesg);
    } else {
        // 没有下一步的话,会进行下一次的读
        getChannelHandlerContext(zuulMesg).fireChannelRead(zuulMesg);
    }
}
  1. 执行下一步操作,进行Zuul的请求链路的一系列操作。
  2. 如果没有下一步操作,将继续执行HandlerchannelRead()方法,进行读取信息的操作。

Zuul 2的接下来的请求链就是寻找对应Endpoint

2 寻找后端服务

我们继续看ZuulEndPointRunner#filter()方法:

@Override
public void filter(final HttpRequestMessage zuulReq) {
    // 判断请求存活状态
    if (zuulReq.getContext().isCancelled()) {
        ...
        return;
    }

    // 获取Endpoint类名称
    final String endpointName = getEndPointName(zuulReq.getContext());
    try {
        Preconditions.checkNotNull(zuulReq, "input message");
        // 获取对应的Endpoint
        final ZuulFilter<HttpRequestMessage, HttpResponseMessage> endpoint = getEndpoint(endpointName, zuulReq);
        logger.debug("Got endpoint {}, UUID {}", endpoint.filterName(), zuulReq.getContext().getUUID());
        // 设置Endpoint
        setEndpoint(zuulReq, endpoint);
        // 执行
        final HttpResponseMessage zuulResp = filter(endpoint, zuulReq);

        if ((zuulResp != null) && (!(endpoint instanceof ProxyEndpoint))) {
            // 进行下一步链路操作
            logger.debug("Endpoint calling invokeNextStage, UUID {}", zuulReq.getContext().getUUID());
            invokeNextStage(zuulResp);
        }
    } catch (Exception ex) {
        ...
    }
}
  1. 判断请求存活状态。由于一次请求过程中,NioSocketChannel的处理仅会绑定在同一个WorkerEventLoop线程上,所以这个状态是线程内可见的。
  2. 获取Endpoint类名称。如果我们需要对异常、错误等信息进行额外的处理,会走我们自定义的Endpoint或者默认的endpoint.ErrorResponse
  3. 根据endpointName获取对应的Endpoint
  4. 设置Endpoint
  5. 使用Endpoint处理本次请求。
  6. 在刚刚执行没有返回结果,并且Endpoint类型不是ProxyEndPoint,也就是需要进行错误异常处理的情况下,继续进行ZuulFilterChainRunner的下一步操作。

我们继续看下ZuulEndpointRunner#getEndpoint()方法:

protected ZuulFilter<HttpRequestMessage, HttpResponseMessage> getEndpoint(final String endpointName,
                                                                            final HttpRequestMessage zuulRequest) {
    final SessionContext zuulCtx = zuulRequest.getContext();
    // 静态请求处理
    if (zuulCtx.getStaticResponse() != null) {
        return STATIC_RESPONSE_ENDPOINT;
    }
    // Endpoint名称校验
    if (endpointName == null) {
        return new MissingEndpointHandlingFilter("NO_ENDPOINT_NAME");
    }
    // 创建ProxyEndpoint请求对象
    if (PROXY_ENDPOINT_FILTER_NAME.equals(endpointName)) {
        return newProxyEndpoint(zuulRequest);
    }
    // 获取自定义的Endpoint
    final Endpoint<HttpRequestMessage, HttpResponseMessage> filter = getEndpointFilter(endpointName);
    if (filter == null) {
        return new MissingEndpointHandlingFilter(endpointName);
    }

    return filter;
}
  1. 静态请求处理。
  2. Endpoint名称校验。
  3. 如果Endpoint的名称符合ProxyEndpoint类的名称,创建一个新的Endpoint
  4. 如果不符合Endpoint名称,那么我们会从ZuulFilter中找到对应的Endpoint,如果没有找到,这次请求就会抛出异常。

我们继续看ZuulEndpointRunner#newProxyEndpoint()方法:

protected ZuulFilter<HttpRequestMessage, HttpResponseMessage> newProxyEndpoint(HttpRequestMessage zuulRequest) {
    return new ProxyEndpoint(zuulRequest, getChannelHandlerContext(zuulRequest), getNextStage(), MethodBinding.NO_OP_BINDING);
}
public ProxyEndpoint(final HttpRequestMessage inMesg, final ChannelHandlerContext ctx,
                     final FilterRunner<HttpResponseMessage, ?> filters, MethodBinding<?> methodBinding) {
    this(inMesg, ctx, filters, methodBinding, new NettyRequestAttemptFactory());
}

public ProxyEndpoint(final HttpRequestMessage inMesg, final ChannelHandlerContext ctx,
                     final FilterRunner<HttpResponseMessage, ?> filters, MethodBinding<?> methodBinding,
                     NettyRequestAttemptFactory requestAttemptFactory) {
    channelCtx = ctx;
    responseFilters = filters;
    zuulRequest = transformRequest(inMesg);
    context = zuulRequest.getContext();
    // 获取Origin
    origin = getOrigin(zuulRequest);
    requestAttempts = RequestAttempts.getFromSessionContext(context);
    passport = CurrentPassport.fromSessionContext(context);
    chosenServer = new AtomicReference<>();
    chosenHostAddr = new AtomicReference<>();

    this.sslRetryBodyCache = preCacheBodyForRetryingSslRequests();
    this.populatedSslRetryBody = SpectatorUtils.newCounter("zuul.populated.ssl.retry.body", origin == null ? "null" : origin.getVip());

    this.methodBinding = methodBinding;
    this.requestAttemptFactory = requestAttemptFactory;
}

Endpoint核心组件包括:后端origin集群请求上下文服务器选择等。

3 Origin管理

我们继续看ProxyEndpoint#getOrigin()方法:

protected NettyOrigin getOrigin(HttpRequestMessage request) {
    SessionContext context = request.getContext();
    // 获取Origin的管理器
    OriginManager<NettyOrigin> originManager = (OriginManager<NettyOrigin>) context.get(CommonContextKeys.ORIGIN_MANAGER);
    if (Debug.debugRequest(context)) {
        // debug模式下的日志记录
        ImmutableList.Builder<String> routingLogEntries = (ImmutableList.Builder<String>) context.get(CommonContextKeys.ROUTING_LOG);
        if (routingLogEntries != null) {
            for (String entry : routingLogEntries.build()) {
                Debug.addRequestDebug(context, "RoutingLog: " + entry);
            }
        }
    }
    // 获取当前请求的VIP并校验
    String primaryRoute = context.getRouteVIP();
    if (StringUtils.isEmpty(primaryRoute)) {
        return null;
    }

    String restClientVIP = primaryRoute;
    // VIP名称截取
    boolean useFullName = context.getBoolean(CommonContextKeys.USE_FULL_VIP_NAME);
    String restClientName = useFullName ? restClientVIP : VipUtils.getVIPPrefix(restClientVIP);

    NettyOrigin origin = null;
    if (restClientName != null) {
        // 获取Origin
        origin = getOrCreateOrigin(originManager, restClientName, restClientVIP, request.reconstructURI(), useFullName, context);
    }

    // 自定义VIP处理
    Pair<String, String> customVip = injectCustomVip(request);
    if (customVip != null) {
        restClientVIP = customVip.getLeft();
        restClientName = customVip.getRight();
        origin = getOrCreateOrigin(originManager, restClientName, restClientVIP, request.reconstructURI(), useFullName, context);
    }
    // 校验Origin
    verifyOrigin(context, request, restClientName, origin);

    // 从Origin中更新最新的VIP信息
    if (origin != null) {
        context.set(CommonContextKeys.ACTUAL_VIP, origin.getClientConfig().get(IClientConfigKey.Keys.DeploymentContextBasedVipAddresses));
        context.set(CommonContextKeys.ORIGIN_VIP_SECURE, origin.getClientConfig().get(IClientConfigKey.Keys.IsSecure));
    }

    return origin;
}
  1. 获取Origin管理器,我们在Zuul 2启动源码分析时,有一个注入的Bean是BasicNettyOriginManager,对应的使用地方就在这里。
  2. debug模式下的执行信息记录。
  3. 如果没有VIP地址,直接返回null,接下来会有handleNoOriginSelected()方法对这种没有Origin的问题进行处理。
  4. 进行VIP地址的截取,如果不需要使用完整的VIP名称,Zuul 2会以“:”进行分离。
  5. 如果有请求后端的Origin名称,我们会获取一个Origin
  6. 如果我们是实现自定义的ProxyEndpoint,那么可以自定义一套VIP路由地址。同时也会根据新的VIP地址信息,获取一个新的Origin
  7. 校验Origin。是获取Origin为null的情况下,进行一些特殊的处理。
  8. Origin中获取最新的VIP信息,更新到SessionContext中。

我们继续看ProxyEndpoint#getOrCreateOrigin()方法:

private NettyOrigin getOrCreateOrigin(OriginManager<NettyOrigin> originManager, String name, String vip, String uri, boolean useFullVipName, SessionContext ctx) {
    // ① 从Origin集合中获取Origin
    NettyOrigin origin = originManager.getOrigin(name, vip, uri, ctx);
    if (origin == null) {
        // ② 创建一个新Origin
        origin = originManager.createOrigin(name, vip, uri, useFullVipName, ctx);
    }
    return origin;
}
  1. Origin集合中获取Origin。
  2. 创建一个新的Origin。一般到达这里是异常情况,比如ConcurrentHashMap在扩容等情况,会再尝试创建一次Origin。

我们继续看getOrigin()方法:

@Override
public BasicNettyOrigin getOrigin(String name, String vip, String uri, SessionContext ctx) {
    return originMappings.computeIfAbsent(name, n -> createOrigin(name, vip, uri, false, ctx));
}

获取Origin是是从缓存中获取已有的Origin集群信息。

我们继续看ProxyEndpoint#createOrigin()方法:

@Override
public BasicNettyOrigin createOrigin(String name, String vip, String uri, boolean useFullVipName, SessionContext ctx) {
    return new BasicNettyOrigin(name, vip, registry);
}

BasicNettyOrigin的构造方法:

public BasicNettyOrigin(String name, String vip, Registry registry) {
    this.name = name;
    this.vip = vip;
    this.registry = registry;
    // 负载均衡配置
    this.config = setupClientConfig(name);
    this.clientChannelManager = new DefaultClientChannelManager(name, vip, config, registry);
    // 初始化Origin管理计数器
    this.clientChannelManager.init();
    this.requestAttemptFactory = new NettyRequestAttemptFactory();
	// Origin计数器
    this.concurrentRequests = SpectatorUtils.newGauge("zuul.origin.concurrent.requests", name, new AtomicInteger(0));
    this.rejectedRequests = SpectatorUtils.newCounter("zuul.origin.rejected.requests", name);
    this.concurrencyMax = new CachedDynamicIntProperty("zuul.origin." + name + ".concurrency.max.requests", 200);
    this.concurrencyProtectionEnabled = new CachedDynamicBooleanProperty("zuul.origin." + name + ".concurrency.protect.enabled", true);
}
  1. 负载均衡配置。在我之前的文章中讲过Ribbon负载均衡时,IClientConfig是客户端请求的配置参数。
  2. 初始化一些用于Origin管理的计数器。比如负载均衡等服务和统计方向的计数器。
  3. 初始化Origin本身的一些计数器。比如当前的并发请求数,拒绝请求数,并发请求阈值以及是否进行并发保护。

我们继续看DefaultClientChannelManager的构造方法:

public DefaultClientChannelManager(String originName, String vip, IClientConfig clientConfig, Registry spectatorRegistry) {
    // 创建负载均衡器
    this.loadBalancer = createLoadBalancer(clientConfig);

    this.vip = vip;
    this.clientConfig = clientConfig;
    this.spectatorRegistry = spectatorRegistry;
    this.perServerPools = new ConcurrentHashMap<>(200);

    // 添加负载均衡监听器
    this.loadBalancer.addServerListChangeListener((oldList, newList) -> removeMissingServerConnectionPools(oldList, newList));

    this.connPoolConfig = new ConnectionPoolConfigImpl(originName, this.clientConfig);
	// 计数器
    this.createNewConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create", originName);
    this.createConnSucceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_success", originName);
    this.createConnFailedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_fail", originName);

    this.closeConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_close", originName);
    this.requestConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_request", originName);
    this.reuseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_reuse", originName);
    this.releaseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_release", originName);
    this.alreadyClosedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_alreadyClosed", originName);
    this.connTakenFromPoolIsNotOpen = SpectatorUtils.newCounter(METRIC_PREFIX + "_fromPoolIsClosed", originName);
    this.maxConnsPerHostExceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_maxConnsPerHostExceeded", originName);
    this.closeWrtBusyConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeWrtBusyConnCounter", originName);
    this.connEstablishTimer = PercentileTimer.get(spectatorRegistry, spectatorRegistry.createId(METRIC_PREFIX + "_createTiming", "id", originName));
    this.connsInPool = SpectatorUtils.newGauge(METRIC_PREFIX + "_inPool", originName, new AtomicInteger());
    this.connsInUse = SpectatorUtils.newGauge(METRIC_PREFIX + "_inUse", originName, new AtomicInteger());
}
  1. 创建负载均衡器。如果我们没有指定使用的负载均衡器,默认使用ZoneAwareLoadBalancer负载均衡器,ZoneAwareLoadBalancer的效果请查阅我之前关于Ribbon的文章。
  2. 添加负载均衡监听器。当集群服务发生变化时,会自动的更新后端服务列表。
  3. 一系列的请求、时间程序计数器

一个Origin集群就已经创建完毕了。

我们继续回到ZuulEndpointRunner#filter()方法,我们已经执行完getEndpoint()方法:

public static void setEndpoint(HttpRequestMessage zuulReq, ZuulFilter<HttpRequestMessage, HttpResponseMessage> endpoint) {
    zuulReq.getContext().set(ZUUL_ENDPOINT, endpoint);
}

filter的处理方式就是在SessionContext中放入对象,便于下一个Filter进行处理,我们继续看ZuuleEndpointRunner的处理,由于ZuulEndpointRunner没有实现filter,我们看下它的父类ZuulEndpointRunner#filter()实现:

protected final O filter(final ZuulFilter<I, O> filter, final I inMesg) {
    final long startTime = System.currentTimeMillis();
    final ZuulMessage snapshot = inMesg.getContext().debugRouting() ? inMesg.clone() : null;
    FilterChainResumer resumer = null;

    try {
        ExecutionStatus filterRunStatus = null;
        // 状态判断
        ...

        // body释放
        inMesg.runBufferedBodyContentThroughFilter(filter);
        // 同步处理
        if (filter.getSyncType() == FilterSyncType.SYNC) {
            final SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter;
            final O outMesg = syncFilter.apply(inMesg);
            recordFilterCompletion(SUCCESS, filter, startTime, inMesg, snapshot);
            return (outMesg != null) ? outMesg : filter.getDefaultOutput(inMesg);
        }

        // 异步处理
        filter.incrementConcurrency();
        resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);
        filter.applyAsync(inMesg)
                .observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
                .doOnUnsubscribe(resumer::decrementConcurrency)
                .subscribe(resumer);

        return null;
    } catch (Throwable t) {
        if (resumer != null) {
            resumer.decrementConcurrency();
        }
        final O outMesg = handleFilterException(inMesg, filter, t);
        outMesg.finishBufferedBodyIfIncomplete();
        recordFilterCompletion(FAILED, filter, startTime, inMesg, snapshot);
        return outMesg;
    }
}
  1. 请求前一系列的状态判断。
  2. 释放之前filter对HttpBody的修改,更新Body。
  3. 接下来需要根据同步、异步请求分情况进行处理。

由于ProxyEndPoint是同步处理的filter,所以我们看下它的同步处理过程,ProxyEndpoint#filter()

@Override
public HttpResponseMessage apply(final HttpRequestMessage input) {
    try {
		// 校验及时间记录
        if (origin == null) {
            handleNoOriginSelected();
            return null;
        }

        origin.getProxyTiming(zuulRequest).start();
        IClientConfig requestConfig = origin.getExecutionContext(zuulRequest).getRequestConfig();
        originalReadTimeout = requestConfig.getProperty(ReadTimeout, null);
        setReadTimeoutOnContext(requestConfig, 1);

        origin.onRequestExecutionStart(zuulRequest);
        // 执行请求
        proxyRequestToOrigin();

    	// 这里不会返回执行结果,由invokeNext()触发响应请求
        return null;
    } catch (Exception ex) {
        handleError(ex);
        return null;
    }
}
  1. 校验Origin及时间记录。
  2. 执行Http请求。

这里不会返回执行结果,将由invokeNext()触发响应请求。

我们继续看下ProxyEndpoint#proxyRequestToOrigin()方法:

private void proxyRequestToOrigin() {
    Promise<PooledConnection> promise = null;
    try {
        attemptNum += 1;
        requestStat = createRequestStat();
        origin.preRequestChecks(zuulRequest);
        concurrentReqCount++;

        // 进行HTTP的请求
        promise = origin.connectToOrigin(zuulRequest, channelCtx.channel().eventLoop(), attemptNum, passport, chosenServer, chosenHostAddr);
		// 信息记录
        storeAndLogOriginRequestInfo();
        currentRequestAttempt = origin.newRequestAttempt(chosenServer.get(), context, attemptNum);
        requestAttempts.add(currentRequestAttempt);
        passport.add(PassportState.ORIGIN_CONN_ACQUIRE_START);

        if (promise.isDone()) {
            operationComplete(promise);
        } else {
            promise.addListener(this);
        }
    } catch (Exception ex) {
        ...
    }
}
  1. 建立和负载均衡器获取的后端服务的连接,传递HTTP请求。
  2. 记录本次请求的信息。

4 总结

  • Zuul 2接收到前端请求后,首先会经过HttpInboundSyncFilter,也就是ZuulFilter对请求进行过滤,我们可以使用ZuulFilter设置请求的后端服务集群名称,也就是VIP名称。
  • Zuul 2使用Origin来管理每一个后端服务集群,每个Origin都拥有一个负载均衡器和计数器。
  • 负载均衡器负责更新当前VIP名称下的最新服务列表,并根据负载均衡策略,为每次请求提供指定的Endpoint。
  • 计数器用于每次请求及请求的结果。
  • Zuul 2由于采用的是Netty模式,每个Worker线程负责和指定Endpoint建立长连接。
赞(2) 投币

评论 抢沙发

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫