本篇文章主要对Zuul 2接收请求后的流程进行分析。
1 Filter执行
1.1 执行InboundFilter
我们在处理完所有的HttpInboundSyncFilter后,接下来还会进行一步操作:
private final void runFilters(final T mesg, final AtomicInteger runningFilterIdx) {
T inMesg = mesg;
String filterName = "-";
try {
Preconditions.checkNotNull(mesg, "Input message");
int i = runningFilterIdx.get();
// 执行所有的HttpInboundFilter
while (i < filters.length) {
final ZuulFilter<T, T> filter = filters[i];
filterName = filter.filterName();
final T outMesg = filter(filter, inMesg);
if (outMesg == null) {
return;
}
inMesg = outMesg;
i = runningFilterIdx.incrementAndGet();
}
// 进行下一步操作
invokeNextStage(inMesg);
} catch (Exception ex) {
...
}
}
- 首先,会执行所有的HttpInboundSyncFilter,也就是我们之前看到过的ZuulFilter,相当于过滤器与拦截器的作用。
- 接下来会进入Zuul 2的下一层操作。
我们继续看BaseZuulFilterRunner#invokeNextStage()方法:
protected final void invokeNextStage(final O zuulMesg) {
if (nextStage != null) {
// 执行下一步操作
nextStage.filter(zuulMesg);
} else {
// 没有下一步的话,会进行下一次的读
getChannelHandlerContext(zuulMesg).fireChannelRead(zuulMesg);
}
}
- 执行下一步操作,进行Zuul的请求链路的一系列操作。
- 如果没有下一步操作,将继续执行Handler的channelRead()方法,进行读取信息的操作。
Zuul 2的接下来的请求链就是寻找对应Endpoint。
2 寻找后端服务
我们继续看ZuulEndPointRunner#filter()方法:
@Override
public void filter(final HttpRequestMessage zuulReq) {
// 判断请求存活状态
if (zuulReq.getContext().isCancelled()) {
...
return;
}
// 获取Endpoint类名称
final String endpointName = getEndPointName(zuulReq.getContext());
try {
Preconditions.checkNotNull(zuulReq, "input message");
// 获取对应的Endpoint
final ZuulFilter<HttpRequestMessage, HttpResponseMessage> endpoint = getEndpoint(endpointName, zuulReq);
logger.debug("Got endpoint {}, UUID {}", endpoint.filterName(), zuulReq.getContext().getUUID());
// 设置Endpoint
setEndpoint(zuulReq, endpoint);
// 执行
final HttpResponseMessage zuulResp = filter(endpoint, zuulReq);
if ((zuulResp != null) && (!(endpoint instanceof ProxyEndpoint))) {
// 进行下一步链路操作
logger.debug("Endpoint calling invokeNextStage, UUID {}", zuulReq.getContext().getUUID());
invokeNextStage(zuulResp);
}
} catch (Exception ex) {
...
}
}
- 判断请求存活状态。由于一次请求过程中,NioSocketChannel的处理仅会绑定在同一个WorkerEventLoop线程上,所以这个状态是线程内可见的。
- 获取Endpoint类名称。如果我们需要对异常、错误等信息进行额外的处理,会走我们自定义的Endpoint或者默认的endpoint.ErrorResponse
- 根据endpointName获取对应的Endpoint。
- 设置Endpoint。
- 使用Endpoint处理本次请求。
- 在刚刚执行没有返回结果,并且Endpoint类型不是ProxyEndPoint,也就是需要进行错误异常处理的情况下,继续进行ZuulFilterChainRunner的下一步操作。
我们继续看下ZuulEndpointRunner#getEndpoint()方法:
protected ZuulFilter<HttpRequestMessage, HttpResponseMessage> getEndpoint(final String endpointName,
final HttpRequestMessage zuulRequest) {
final SessionContext zuulCtx = zuulRequest.getContext();
// 静态请求处理
if (zuulCtx.getStaticResponse() != null) {
return STATIC_RESPONSE_ENDPOINT;
}
// Endpoint名称校验
if (endpointName == null) {
return new MissingEndpointHandlingFilter("NO_ENDPOINT_NAME");
}
// 创建ProxyEndpoint请求对象
if (PROXY_ENDPOINT_FILTER_NAME.equals(endpointName)) {
return newProxyEndpoint(zuulRequest);
}
// 获取自定义的Endpoint
final Endpoint<HttpRequestMessage, HttpResponseMessage> filter = getEndpointFilter(endpointName);
if (filter == null) {
return new MissingEndpointHandlingFilter(endpointName);
}
return filter;
}
- 静态请求处理。
- Endpoint名称校验。
- 如果Endpoint的名称符合ProxyEndpoint类的名称,创建一个新的Endpoint。
- 如果不符合Endpoint名称,那么我们会从ZuulFilter中找到对应的Endpoint,如果没有找到,这次请求就会抛出异常。
我们继续看ZuulEndpointRunner#newProxyEndpoint()方法:
protected ZuulFilter<HttpRequestMessage, HttpResponseMessage> newProxyEndpoint(HttpRequestMessage zuulRequest) {
return new ProxyEndpoint(zuulRequest, getChannelHandlerContext(zuulRequest), getNextStage(), MethodBinding.NO_OP_BINDING);
}
public ProxyEndpoint(final HttpRequestMessage inMesg, final ChannelHandlerContext ctx,
final FilterRunner<HttpResponseMessage, ?> filters, MethodBinding<?> methodBinding) {
this(inMesg, ctx, filters, methodBinding, new NettyRequestAttemptFactory());
}
public ProxyEndpoint(final HttpRequestMessage inMesg, final ChannelHandlerContext ctx,
final FilterRunner<HttpResponseMessage, ?> filters, MethodBinding<?> methodBinding,
NettyRequestAttemptFactory requestAttemptFactory) {
channelCtx = ctx;
responseFilters = filters;
zuulRequest = transformRequest(inMesg);
context = zuulRequest.getContext();
// 获取Origin
origin = getOrigin(zuulRequest);
requestAttempts = RequestAttempts.getFromSessionContext(context);
passport = CurrentPassport.fromSessionContext(context);
chosenServer = new AtomicReference<>();
chosenHostAddr = new AtomicReference<>();
this.sslRetryBodyCache = preCacheBodyForRetryingSslRequests();
this.populatedSslRetryBody = SpectatorUtils.newCounter("zuul.populated.ssl.retry.body", origin == null ? "null" : origin.getVip());
this.methodBinding = methodBinding;
this.requestAttemptFactory = requestAttemptFactory;
}
Endpoint核心组件包括:后端origin集群,请求上下文,服务器选择等。
3 Origin管理
我们继续看ProxyEndpoint#getOrigin()方法:
protected NettyOrigin getOrigin(HttpRequestMessage request) {
SessionContext context = request.getContext();
// 获取Origin的管理器
OriginManager<NettyOrigin> originManager = (OriginManager<NettyOrigin>) context.get(CommonContextKeys.ORIGIN_MANAGER);
if (Debug.debugRequest(context)) {
// debug模式下的日志记录
ImmutableList.Builder<String> routingLogEntries = (ImmutableList.Builder<String>) context.get(CommonContextKeys.ROUTING_LOG);
if (routingLogEntries != null) {
for (String entry : routingLogEntries.build()) {
Debug.addRequestDebug(context, "RoutingLog: " + entry);
}
}
}
// 获取当前请求的VIP并校验
String primaryRoute = context.getRouteVIP();
if (StringUtils.isEmpty(primaryRoute)) {
return null;
}
String restClientVIP = primaryRoute;
// VIP名称截取
boolean useFullName = context.getBoolean(CommonContextKeys.USE_FULL_VIP_NAME);
String restClientName = useFullName ? restClientVIP : VipUtils.getVIPPrefix(restClientVIP);
NettyOrigin origin = null;
if (restClientName != null) {
// 获取Origin
origin = getOrCreateOrigin(originManager, restClientName, restClientVIP, request.reconstructURI(), useFullName, context);
}
// 自定义VIP处理
Pair<String, String> customVip = injectCustomVip(request);
if (customVip != null) {
restClientVIP = customVip.getLeft();
restClientName = customVip.getRight();
origin = getOrCreateOrigin(originManager, restClientName, restClientVIP, request.reconstructURI(), useFullName, context);
}
// 校验Origin
verifyOrigin(context, request, restClientName, origin);
// 从Origin中更新最新的VIP信息
if (origin != null) {
context.set(CommonContextKeys.ACTUAL_VIP, origin.getClientConfig().get(IClientConfigKey.Keys.DeploymentContextBasedVipAddresses));
context.set(CommonContextKeys.ORIGIN_VIP_SECURE, origin.getClientConfig().get(IClientConfigKey.Keys.IsSecure));
}
return origin;
}
- 获取Origin管理器,我们在Zuul 2启动源码分析时,有一个注入的Bean是BasicNettyOriginManager,对应的使用地方就在这里。
- debug模式下的执行信息记录。
- 如果没有VIP地址,直接返回null,接下来会有handleNoOriginSelected()方法对这种没有Origin的问题进行处理。
- 进行VIP地址的截取,如果不需要使用完整的VIP名称,Zuul 2会以“:”进行分离。
- 如果有请求后端的Origin名称,我们会获取一个Origin。
- 如果我们是实现自定义的ProxyEndpoint,那么可以自定义一套VIP路由地址。同时也会根据新的VIP地址信息,获取一个新的Origin。
- 校验Origin。是获取Origin为null的情况下,进行一些特殊的处理。
- 从Origin中获取最新的VIP信息,更新到SessionContext中。
我们继续看ProxyEndpoint#getOrCreateOrigin()方法:
private NettyOrigin getOrCreateOrigin(OriginManager<NettyOrigin> originManager, String name, String vip, String uri, boolean useFullVipName, SessionContext ctx) {
// ① 从Origin集合中获取Origin
NettyOrigin origin = originManager.getOrigin(name, vip, uri, ctx);
if (origin == null) {
// ② 创建一个新Origin
origin = originManager.createOrigin(name, vip, uri, useFullVipName, ctx);
}
return origin;
}
- 从Origin集合中获取Origin。
- 创建一个新的Origin。一般到达这里是异常情况,比如ConcurrentHashMap在扩容等情况,会再尝试创建一次Origin。
我们继续看getOrigin()方法:
@Override
public BasicNettyOrigin getOrigin(String name, String vip, String uri, SessionContext ctx) {
return originMappings.computeIfAbsent(name, n -> createOrigin(name, vip, uri, false, ctx));
}
获取Origin是是从缓存中获取已有的Origin集群信息。
我们继续看ProxyEndpoint#createOrigin()方法:
@Override
public BasicNettyOrigin createOrigin(String name, String vip, String uri, boolean useFullVipName, SessionContext ctx) {
return new BasicNettyOrigin(name, vip, registry);
}
BasicNettyOrigin的构造方法:
public BasicNettyOrigin(String name, String vip, Registry registry) {
this.name = name;
this.vip = vip;
this.registry = registry;
// 负载均衡配置
this.config = setupClientConfig(name);
this.clientChannelManager = new DefaultClientChannelManager(name, vip, config, registry);
// 初始化Origin管理计数器
this.clientChannelManager.init();
this.requestAttemptFactory = new NettyRequestAttemptFactory();
// Origin计数器
this.concurrentRequests = SpectatorUtils.newGauge("zuul.origin.concurrent.requests", name, new AtomicInteger(0));
this.rejectedRequests = SpectatorUtils.newCounter("zuul.origin.rejected.requests", name);
this.concurrencyMax = new CachedDynamicIntProperty("zuul.origin." + name + ".concurrency.max.requests", 200);
this.concurrencyProtectionEnabled = new CachedDynamicBooleanProperty("zuul.origin." + name + ".concurrency.protect.enabled", true);
}
- 负载均衡配置。在我之前的文章中讲过Ribbon负载均衡时,IClientConfig是客户端请求的配置参数。
- 初始化一些用于Origin管理的计数器。比如负载均衡等服务和统计方向的计数器。
- 初始化Origin本身的一些计数器。比如当前的并发请求数,拒绝请求数,并发请求阈值以及是否进行并发保护。
我们继续看DefaultClientChannelManager的构造方法:
public DefaultClientChannelManager(String originName, String vip, IClientConfig clientConfig, Registry spectatorRegistry) {
// 创建负载均衡器
this.loadBalancer = createLoadBalancer(clientConfig);
this.vip = vip;
this.clientConfig = clientConfig;
this.spectatorRegistry = spectatorRegistry;
this.perServerPools = new ConcurrentHashMap<>(200);
// 添加负载均衡监听器
this.loadBalancer.addServerListChangeListener((oldList, newList) -> removeMissingServerConnectionPools(oldList, newList));
this.connPoolConfig = new ConnectionPoolConfigImpl(originName, this.clientConfig);
// 计数器
this.createNewConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create", originName);
this.createConnSucceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_success", originName);
this.createConnFailedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_fail", originName);
this.closeConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_close", originName);
this.requestConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_request", originName);
this.reuseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_reuse", originName);
this.releaseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_release", originName);
this.alreadyClosedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_alreadyClosed", originName);
this.connTakenFromPoolIsNotOpen = SpectatorUtils.newCounter(METRIC_PREFIX + "_fromPoolIsClosed", originName);
this.maxConnsPerHostExceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_maxConnsPerHostExceeded", originName);
this.closeWrtBusyConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeWrtBusyConnCounter", originName);
this.connEstablishTimer = PercentileTimer.get(spectatorRegistry, spectatorRegistry.createId(METRIC_PREFIX + "_createTiming", "id", originName));
this.connsInPool = SpectatorUtils.newGauge(METRIC_PREFIX + "_inPool", originName, new AtomicInteger());
this.connsInUse = SpectatorUtils.newGauge(METRIC_PREFIX + "_inUse", originName, new AtomicInteger());
}
- 创建负载均衡器。如果我们没有指定使用的负载均衡器,默认使用ZoneAwareLoadBalancer负载均衡器,ZoneAwareLoadBalancer的效果请查阅我之前关于Ribbon的文章。
- 添加负载均衡监听器。当集群服务发生变化时,会自动的更新后端服务列表。
- 一系列的请求、时间程序计数器。
一个Origin集群就已经创建完毕了。
我们继续回到ZuulEndpointRunner#filter()方法,我们已经执行完getEndpoint()方法:
public static void setEndpoint(HttpRequestMessage zuulReq, ZuulFilter<HttpRequestMessage, HttpResponseMessage> endpoint) {
zuulReq.getContext().set(ZUUL_ENDPOINT, endpoint);
}
filter的处理方式就是在SessionContext中放入对象,便于下一个Filter进行处理,我们继续看ZuuleEndpointRunner的处理,由于ZuulEndpointRunner没有实现filter,我们看下它的父类ZuulEndpointRunner#filter()实现:
protected final O filter(final ZuulFilter<I, O> filter, final I inMesg) {
final long startTime = System.currentTimeMillis();
final ZuulMessage snapshot = inMesg.getContext().debugRouting() ? inMesg.clone() : null;
FilterChainResumer resumer = null;
try {
ExecutionStatus filterRunStatus = null;
// 状态判断
...
// body释放
inMesg.runBufferedBodyContentThroughFilter(filter);
// 同步处理
if (filter.getSyncType() == FilterSyncType.SYNC) {
final SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter;
final O outMesg = syncFilter.apply(inMesg);
recordFilterCompletion(SUCCESS, filter, startTime, inMesg, snapshot);
return (outMesg != null) ? outMesg : filter.getDefaultOutput(inMesg);
}
// 异步处理
filter.incrementConcurrency();
resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);
filter.applyAsync(inMesg)
.observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
.doOnUnsubscribe(resumer::decrementConcurrency)
.subscribe(resumer);
return null;
} catch (Throwable t) {
if (resumer != null) {
resumer.decrementConcurrency();
}
final O outMesg = handleFilterException(inMesg, filter, t);
outMesg.finishBufferedBodyIfIncomplete();
recordFilterCompletion(FAILED, filter, startTime, inMesg, snapshot);
return outMesg;
}
}
- 请求前一系列的状态判断。
- 释放之前filter对HttpBody的修改,更新Body。
- 接下来需要根据同步、异步请求分情况进行处理。
由于ProxyEndPoint是同步处理的filter,所以我们看下它的同步处理过程,ProxyEndpoint#filter():
@Override
public HttpResponseMessage apply(final HttpRequestMessage input) {
try {
// 校验及时间记录
if (origin == null) {
handleNoOriginSelected();
return null;
}
origin.getProxyTiming(zuulRequest).start();
IClientConfig requestConfig = origin.getExecutionContext(zuulRequest).getRequestConfig();
originalReadTimeout = requestConfig.getProperty(ReadTimeout, null);
setReadTimeoutOnContext(requestConfig, 1);
origin.onRequestExecutionStart(zuulRequest);
// 执行请求
proxyRequestToOrigin();
// 这里不会返回执行结果,由invokeNext()触发响应请求
return null;
} catch (Exception ex) {
handleError(ex);
return null;
}
}
- 校验Origin及时间记录。
- 执行Http请求。
这里不会返回执行结果,将由invokeNext()触发响应请求。
我们继续看下ProxyEndpoint#proxyRequestToOrigin()方法:
private void proxyRequestToOrigin() {
Promise<PooledConnection> promise = null;
try {
attemptNum += 1;
requestStat = createRequestStat();
origin.preRequestChecks(zuulRequest);
concurrentReqCount++;
// 进行HTTP的请求
promise = origin.connectToOrigin(zuulRequest, channelCtx.channel().eventLoop(), attemptNum, passport, chosenServer, chosenHostAddr);
// 信息记录
storeAndLogOriginRequestInfo();
currentRequestAttempt = origin.newRequestAttempt(chosenServer.get(), context, attemptNum);
requestAttempts.add(currentRequestAttempt);
passport.add(PassportState.ORIGIN_CONN_ACQUIRE_START);
if (promise.isDone()) {
operationComplete(promise);
} else {
promise.addListener(this);
}
} catch (Exception ex) {
...
}
}
- 建立和负载均衡器获取的后端服务的连接,传递HTTP请求。
- 记录本次请求的信息。
4 总结
- Zuul 2接收到前端请求后,首先会经过HttpInboundSyncFilter,也就是ZuulFilter对请求进行过滤,我们可以使用ZuulFilter设置请求的后端服务集群名称,也就是VIP名称。
- Zuul 2使用Origin来管理每一个后端服务集群,每个Origin都拥有一个负载均衡器和计数器。
- 负载均衡器负责更新当前VIP名称下的最新服务列表,并根据负载均衡策略,为每次请求提供指定的Endpoint。
- 计数器用于每次请求及请求的结果。
- Zuul 2由于采用的是Netty模式,每个Worker线程负责和指定Endpoint建立长连接。