我们在上一篇文章对请求如何寻找Origin,Origin的转发流程进行了解析。
本篇文章我们将继续分析Origin的请求以及Zuul 2对请求响应的处理。
建立和后端服务的连接
我们在处理请求的时候调用的是BasicNettyOrigin#connectToOrigin()方法:
@Override
public Promise<PooledConnection> connectToOrigin(HttpRequestMessage zuulReq, EventLoop eventLoop, int attemptNumber, CurrentPassport passport, AtomicReference<Server> chosenServer, AtomicReference<String> chosenHostAddr) {
return clientChannelManager.acquire(eventLoop, null, zuulReq.getMethod().toUpperCase(),
zuulReq.getPath(), attemptNumber, passport, chosenServer, chosenHostAddr);
}
我们会使用给当前Origin配置的全局DefaultClientChannelManager做一个类似资源获取的操作:
@Override
public Promise<PooledConnection> acquire(final EventLoop eventLoop, final Object key, final String httpMethod, final String uri, final int attemptNum, final CurrentPassport passport, final AtomicReference<Server> selectedServer, final AtomicReference<String> selectedHostAdddr) {
// 计数校验
if (attemptNum < 1) {
throw new IllegalArgumentException("attemptNum must be greater than zero");
}
// 判断Origin的服务状态
if (shuttingDown) {
Promise<PooledConnection> promise = eventLoop.newPromise();
promise.setFailure(SHUTTING_DOWN_ERR);
return promise;
}
// 选取下一个Server
final Server chosenServer = loadBalancer.chooseServer(key);
if (chosenServer == null) {
Promise<PooledConnection> promise = eventLoop.newPromise();
promise.setFailure(new OriginConnectException("No servers available", OutboundErrorType.NO_AVAILABLE_SERVERS));
return promise;
}
// 创建一个统一的Instance实例
final InstanceInfo instanceInfo = chosenServer instanceof DiscoveryEnabledServer ?
((DiscoveryEnabledServer) chosenServer).getInstanceInfo() :
// create mock instance info for non-discovery instances
new InstanceInfo(chosenServer.getId(), null, null, chosenServer.getHost(), chosenServer.getId(),
null, null, null, null, null, null, null, null, 0, null, null, null, null, null, null, null, null, null, null, null, null);
// 更新Origin中的choserServer
selectedServer.set(chosenServer);
// 获取连接池
IConnectionPool pool = perServerPools.computeIfAbsent(chosenServer, s -> {
LoadBalancerStats lbStats = loadBalancer.getLoadBalancerStats();
ServerStats stats = lbStats.getSingleServerStat(chosenServer);
final ClientChannelManager clientChannelMgr = this;
PooledConnectionFactory pcf = createPooledConnectionFactory(chosenServer, instanceInfo, stats, clientChannelMgr, closeConnCounter, closeWrtBusyConnCounter);
return createConnectionPool(chosenServer, stats, instanceInfo, clientConnFactory, pcf, connPoolConfig,
clientConfig, createNewConnCounter, createConnSucceededCounter, createConnFailedCounter,
requestConnCounter, reuseConnCounter, connTakenFromPoolIsNotOpen, maxConnsPerHostExceededCounter,
connEstablishTimer, connsInPool, connsInUse);
});
// ⑦ 连接池进行操作
return pool.acquire(eventLoop, null, httpMethod, uri, attemptNum, passport, selectedHostAdddr);
}
- 请求计数校验。
- 判断Origin的服务状态。
- 使用负载均衡器选取下一个Server。我们在这里会根据负载均衡算法,获取一个可用服务节点。
- 根据选取的服务器,创建服务实例。
- 更新选取的服务节点,也就说我们在之前Origin中看到的chosenServer,其实是上次请求调用的服务节点,传递到这里的目的在于进行实时更新。
- 获取请求连接池。
- 进行请求操作。
我们继续看PerServerConnectionPool#acquire()方法:
@Override
public Promise<PooledConnection> acquire(EventLoop eventLoop, Object key, String httpMethod, String uri,
int attemptNum, CurrentPassport passport,
AtomicReference<String> selectedHostAddr) {
// 计数器操作
requestConnCounter.increment();
stats.incrementActiveRequestsCount();
// 创建Promise对象
Promise<PooledConnection> promise = eventLoop.newPromise();
// 从连接池中获取连接
final PooledConnection conn = tryGettingFromConnectionPool(eventLoop);
if (conn != null) {
// 执行请求
conn.startRequestTimer();
conn.incrementUsageCount();
conn.getChannel().read();
onAcquire(conn, httpMethod, uri, attemptNum, passport);
promise.setSuccess(conn);
selectedHostAddr.set(getHostFromServer(conn.getServer()));
} else {
// 使用新连接执行请求
tryMakingNewConnection(eventLoop, promise, httpMethod, uri, attemptNum, passport, selectedHostAddr);
}
return promise;
}
- 进行计数器操作。
- 根据整个请求链路中,全局的WorkerEventLoop线程创建Promise对象。
- 从连接池中获取可用连接。这里会使用WorkerEvertLoop作为标识创建类型为ConcurrentLinkedDeque的连接队列。
- 在获取到可用连接的情况下,使用该连接进行处理请求,我们会发现它也是通过channel→channelRead()方式,进行响应的回调,这种连接队列的处理实际上是对WorkerEvertLoop的一种解放。
- 如果没有从连接池中获取到连接,那么我们会创建一个连接来执行请求。
在原理分析过程中发现:连接池一直没有放入连接的行为,所有请求都会走到tryMakingNewConnection()方法中。
个人理解为,我们可以在自己的实现中对连接池进行优化。
我们继续来看PerServerConnectionPool#tryMakingNewConnection()方法:
protected void tryMakingNewConnection(final EventLoop eventLoop, final Promise<PooledConnection> promise, final String httpMethod, final String uri, final int attemptNum, final CurrentPassport passport, final AtomicReference<String> selectedHostAddr) {
// 限流处理
int maxConnectionsPerHost = config.maxConnectionsPerHost();
int openAndOpeningConnectionCount = stats.getOpenConnectionsCount() + connCreationsInProgress.get();
if (maxConnectionsPerHost != -1 && openAndOpeningConnectionCount >= maxConnectionsPerHost) {
maxConnsPerHostExceededCounter.increment();
promise.setFailure(new OriginConnectException(
"maxConnectionsPerHost=" + maxConnectionsPerHost + ", connectionsPerHost=" + openAndOpeningConnectionCount,
OutboundErrorType.ORIGIN_SERVER_MAX_CONNS));
LOG.warn("Unable to create new connection because at MaxConnectionsPerHost! "
+ "maxConnectionsPerHost=" + maxConnectionsPerHost
+ ", connectionsPerHost=" + openAndOpeningConnectionCount
+ ", host=" + instanceInfo.getId()
+ "origin=" + config.getOriginName()
);
return;
}
// 请求计时器
Timing timing = startConnEstablishTimer();
try {
createNewConnCounter.increment();
connCreationsInProgress.incrementAndGet();
passport.add(PassportState.ORIGIN_CH_CONNECTING);
// 获取服务器Host地址
String host = getHostFromServer(server);
selectedHostAddr.set(host);
// 执行请求
final ChannelFuture cf = connectionFactory.connect(eventLoop, host, server.getPort(), passport);
if (cf.isDone()) {
endConnEstablishTimer(timing);
handleConnectCompletion(cf, promise, httpMethod, uri, attemptNum,
passport);
} else {
cf.addListener(future -> {
try {
endConnEstablishTimer(timing);
handleConnectCompletion((ChannelFuture) future, promise, httpMethod, uri, attemptNum,
passport);
} catch (Throwable e) {
if (!promise.isDone()) {
promise.setFailure(e);
}
LOG.warn("Error creating new connection! "
+ "origin=" + config.getOriginName()
+ ", host=" + instanceInfo.getId()
);
}
});
}
} catch (Throwable e) {
endConnEstablishTimer(timing);
promise.setFailure(e);
}
}
- 限流处理,会对当前准备打开以及正在处理的线程之和与连接池的最大容量进行对比,如果超过连接池的最大容量,直接返回OriginConnectException。
- 请求计数器,包括HTTP请求的时间记录器和当前连接池使用的计数器。
- 获取服务器的Host地址,一般是服务节点的IP:PORT地址。
- 执行请求。
在执行请求时,我们会发现使用的是我们在创建BasicNettyOrigin,初始化DefaultClientChannelManager时,创建的Netty客户端连接工厂。
@Override
public void init() {
// 初始化Origin与后端服务建立连接的Netty服务
this.channelInitializer = createChannelInitializer(clientConfig, connPoolConfig, spectatorRegistry);
this.clientConnFactory = createNettyClientConnectionFactory(connPoolConfig, channelInitializer);
}
在初始化过程中,我们也对这个Netty客户端进行了Origin的定制化处理,专属的ChannelInitializer,ChannelPipeline等。
而我们在限流处理中看到的连接池大小比对,不是刚刚准备处理请求的连接池大小比对,而是在初始化DefaultClientChannelManager时,创建的用于管理此时这个Netty客户端的配置管理信息。
也就是说,我们现在看到这个NettyClientConnectionFactory的限流管理,依托于ConnectionPoolConfigImpl具体实现。
我们继续看NettyClientConnectionFactory#connect():
public ChannelFuture connect(final EventLoop eventLoop, String host, final int port, CurrentPassport passport) {
Class socketChannelClass;
if (Server.USE_EPOLL.get()) {
socketChannelClass = EpollSocketChannel.class;
} else {
socketChannelClass = NioSocketChannel.class;
}
SocketAddress socketAddress = new InetSocketAddress(host, port);
final Bootstrap bootstrap = new Bootstrap()
.channel(socketChannelClass)
.handler(channelInitializer)
.group(eventLoop)
.attr(CurrentPassport.CHANNEL_ATTR, passport)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connPoolConfig.getConnectTimeout())
.option(ChannelOption.SO_KEEPALIVE, connPoolConfig.getTcpKeepAlive())
.option(ChannelOption.TCP_NODELAY, connPoolConfig.getTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, connPoolConfig.getTcpSendBufferSize())
.option(ChannelOption.SO_RCVBUF, connPoolConfig.getTcpReceiveBufferSize())
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, connPoolConfig.getNettyWriteBufferHighWaterMark())
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, connPoolConfig.getNettyWriteBufferLowWaterMark())
.option(ChannelOption.AUTO_READ, connPoolConfig.getNettyAutoRead())
.remoteAddress(socketAddress);
ZuulBootstrap zuulBootstrap = new ZuulBootstrap(bootstrap);
if (!zuulBootstrap.getResolver(eventLoop).isResolved(socketAddress)) {
LOGGER.warn("NettyClientConnectionFactory got an unresolved server address, host: " + host + ", port: " + port);
unresolvedDiscoveryHost.increment();
}
return bootstrap.connect();
}
此时到了Zuul 2作为网关,转发请求的最底层实现。
而我们还会继续使用当前WorkerEventLoop线程完成此次的请求转发,bootstrap.connect()也是完成了对指定后端服务节点的一次基于Channel的写执行。
总结
- 与后端服务建立请求,是在每个Origin集群的管理者DefaultClientChannelManager中维护了一个Netty服务。
- 创建连接时,将会使用当前处理请求的WorkerEventLoop线程。
- 请求连接也可以进行池化处理。