在进行Spring Cloud Alibaba Sentinel服务注册源码分析时,我们已经得出结论:
- spring-cloud-alibaba-sentinel默认是懒加载的。
- 控制懒加载的参数是spring.cloud.sentinel.eager,默认是false。
- 立即加载是发生在SentinelAutoConfiguration中。
我们在实验过程中会发现,当应用服务启动时,我们需要触发下一下Sentinel资源,对应的应用信息,才会显示在Sentinel控制台中。
Sentinel加载之后,又做了一些什么呢?
@SentinelResource的切面
沿着实验过程顺藤摸瓜,我们可以找到@SentinelResource的切面和切点:
@Configuration
public class AopConfiguration {
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
}
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
// ① 解析当前方法
Method originMethod = resolveMethod(pjp);
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
// ② 注解校验
if (annotation == null) {
...
}
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
Entry entry = null;
try {
// ③ 请求上报
entry = SphU.entry(resourceName, entryType, 1, pjp.getArgs());
Object result = pjp.proceed();
return result;
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// ④ 异常规避处理
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
// ⑤ 异常追踪处理
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex, annotation);
return handleFallback(pjp, annotation, ex);
}
// ⑥ 直接抛出异常
throw ex;
} finally {
if (entry != null) {
// ⑦请求结束,计数减1
entry.exit(1, pjp.getArgs());
}
}
}
}
- 解析使用@SentinelResource注解的方法,获取相应的方法信息。
- 进行@SentinelResource注解校验。
- 使用Sentinel进行请求上报。
- 异常规避处理。
- 异常追踪处理。
- 直接抛出异常。
- 请求完成后,对应的规则计数器减一。
接下来我们看SphU#entry()静态方法:
public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
return Env.sph.entry(name, type, count, args);
}
我们发现存在一个Env的单例对象:
public class Env {
public static final Sph sph = new CtSph();
static {
// 如果初始化失败,该进程会关闭
InitExecutor.doInit();
}
}
我们在Env的静态初始化方法中,看到了关键信息InitExecutor#doInit():
public static void doInit() {
// ① 乐观锁
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
// ② 加载InitFunc接口
ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
// ③ 调整初始化顺序
insertSorted(initList, initFunc);
}
// ④ 按顺序进行初始化
for (OrderWrapper w : initList) {
w.func.init();
RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
w.func.getClass().getCanonicalName(), w.order));
}
} catch (Exception ex) {
...
} catch (Error error) {
...
}
}
- 使用乐观锁对初始化过程进行加锁,代表着一次启动后仅会加载一次。
- 获取InitFunc.class接口类型的SPI类。
- 将需要初始化的类用OrderWrapper包装一层,并且根据InitOrder调整初始化顺序。
- 按顺序进行初始化。
我们来看一下默认的几个初始化类。
InitExecutor#doInit()Sentinel服务注册中的立即加载,有异曲同工之妙。
Sentinel的SPI
CommandCenterInitFunc
初始化命令中心
这是上报功能的关键对象
@Override
public void init() throws Exception {
// ① 获取CommandCenter对象
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
if (commandCenter == null) {
RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
return;
}
// ② 准备执行
commandCenter.beforeStart();
// ③ 开始执行
commandCenter.start();
RecordLog.info("[CommandCenterInit] Starting command center: "
+ commandCenter.getClass().getCanonicalName());
}
- 从CommandCenterProvider中获取一个CommandCenter对象。
- 进行CommandCenter的预执行工作。
- 执行CommandCenter。
了解CommandCenter的操作步骤,我们接下来分析一下它的具体作用。
在CommandCenterProvider进行初次加载时,会进行一次SPI操作,CommandCenterProvider#resolveInstance
private static void resolveInstance() {
CommandCenter resolveCommandCenter = SpiLoader.loadHighestPriorityInstance(CommandCenter.class);
if (resolveCommandCenter == null) {
RecordLog.warn("[CommandCenterProvider] WARN: No existing CommandCenter found");
} else {
commandCenter = resolveCommandCenter;
RecordLog.info("[CommandCenterProvider] CommandCenter resolved: " + resolveCommandCenter.getClass()
.getCanonicalName());
}
}
它会获取最高优先级的CommandCenter,可见CommandCenter的实现不止一个。
截稿为止,CommandCenter的默认实现有SimpleHttpCommandCenter和NettyHttpCommandCenter。
在默认的Sentinel配置中,SimpleHttpCommandCenter优先级最高,看下它的执行和预执行部分。
public void beforeStart() throws Exception {
// 注册所有的命令处理句柄
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers);
}
在预执行过程中,注册了Sentinel所需要的所有上报命令处理句柄。
大致包括:请求应用的所有规则,客户端配置,客户端状态、Http请求信息等,以及更新规则等写操作。
我们继续看执行操作:
@Override
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
// ① 创建一个线程池
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
// ② 创建一个线程任务
Runnable serverInitTask = new Runnable() {
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
// ③ 执行线程任务
new Thread(serverInitTask).start();
}
- 创建一个线程池。
- 创建一个线程任务,并且开启了一个默认端口8719的Socket连接,这个线程任务是一个服务线程任务。
- 开始执行这个线程任务。
任务调度
那么这个线程池和调度任务,具体负责什么工作呢?
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
setName("sentinel-courier-server-accept-thread");
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
// ① 创建一个HttpEventTask,并执行
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
...
}
}
}
}
这里创建并使用SimpleHttpCommandCenter#start()方法执行这个事件任务。
我们继续看HttpEventTask:
@Override
public void run() {
if (socket == null) {
return;
}
BufferedReader in = null;
PrintWriter printWriter = null;
try {
long start = System.currentTimeMillis();
in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset()));
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String line = in.readLine();
CommandCenterLog.info("[SimpleHttpCommandCenter] socket income: " + line
+ "," + socket.getInetAddress());
CommandRequest request = parseRequest(line);
// ① 解析Http请求
if (line.length() > 4 && StringUtil.equalsIgnoreCase("POST", line.substring(0, 4))) {
String bodyLine = null;
boolean bodyNext = false;
boolean supported = false;
int maxLength = 8192;
while (true) {
if (bodyNext) {
if (!supported) {
break;
}
char[] bodyBytes = new char[maxLength];
int read = in.read(bodyBytes);
String postData = new String(bodyBytes, 0, read);
parseParams(postData, request);
break;
}
bodyLine = in.readLine();
if (bodyLine == null) {
break;
}
if (StringUtil.isEmpty(bodyLine)) {
bodyNext = true;
continue;
}
int index = bodyLine.indexOf(":");
if (index < 1) {
continue;
}
String headerName = bodyLine.substring(0, index);
String header = bodyLine.substring(index + 1).trim();
if (StringUtil.equalsIgnoreCase("content-type", headerName)) {
if (StringUtil.equals("application/x-www-form-urlencoded", header)) {
supported = true;
} else {
break;
}
} else if (StringUtil.equalsIgnoreCase("content-length", headerName)) {
try {
int len = new Integer(header);
if (len > 0) {
maxLength = len;
}
} catch (Exception e) {
}
}
}
}
// ② 获取执行命令
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
badRequest(printWriter, "Invalid command");
return;
}
// ③ 执行具体操作
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter, outputStream);
} else {
badRequest(printWriter, "Unknown command `" + commandName + '`');
}
// ④ 返回执行结果
printWriter.flush();
...
} catch (Throwable e) {
...
} finally {
...
}
}
- POST请求,从Body中解析参数。
- 获取并校验此次执行的命令。
- 从在与执行操作中注册的CommandHandler获取对应的命令,并执行handle()操作。
- 无论成功与失败,返回对应的响应。
随意给大家看一下请求样式:
GET /setRules?type=flow&data=%5B%7B%22clusterMode%22%3Afalse%2C%22controlBehavior%22%3A0%2C%22count%22%3A2.0%2C%22grade%22%3A1%2C%22limitApp%22%3A%22default%22%2C%22maxQueueingTimeMs%22%3A500%2C%22resource%22%3A%22sentinelTest%22%2C%22strategy%22%3A0%2C%22warmUpPeriodSec%22%3A10%7D%5D HTTP/1.1, address: /172.19.164.140, time cost: 3188 ms
HeartbeatSenderInitFunc
HeartbeatSenderInitFunc用于和Sentinel服务端进行心跳机制,核心实现init():。
@Override
public void init() {
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
return;
}
// ① 创建调度器
initSchedulerIfNeeded();
// ② 获取并设置默认的心跳间隔
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
// ③ 执行心跳任务
scheduleHeartbeatTask(sender, interval);
}
HeartbeatSenderProvider获取HeartbeatSender的策略和CommandCenter类似,均是通过SPI获取对应最高优先级的执行对象,我们仍然以SimpleHttpHeartbeatSender进行举例。
- 创建一个coolPoolSize为2,拒绝策略为丢弃最久任务的任务调度线程池。
- 获取并设置默认的心跳间隔时间,默认是5000ms。
- 执行心跳任务。
我们继续看如何执行心跳任务,HeartbeatSenderInitFunc#scheduleHeartbeatTask:
private void scheduleHeartbeatTask(final HeartbeatSender sender, /*@Valid*/ long interval) {
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
sender.sendHeartbeat();
} catch (Throwable e) {
RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 5000, interval, TimeUnit.MILLISECONDS);
RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
+ sender.getClass().getCanonicalName());
}
我们可以看出这个任务是延迟5000ms执行,每次任务的执行间隔为刚才我们计算出心跳间隔时间。
我们继续看心跳的具体实现,SimpleHttpHeartbeatSender#sendHeartbeat():
@Override
public boolean sendHeartbeat() throws Exception {
if (TransportConfig.getRuntimePort() <= 0) {
...
return false;
}
// ① 获取服务端请求地址
InetSocketAddress addr = getAvailableAddress();
if (addr == null) {
return false;
}
// ② 组装Http请求
SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH);
request.setParams(heartBeat.generateCurrentMessage());
try {
// ③ 执行Http请求
SimpleHttpResponse response = httpClient.post(request);
if (response.getStatusCode() == OK_STATUS) {
return true;
}
} catch (Exception e) {
...
}
return false;
}
- 从系统配置中,获取服务端的请求地址。
- 组装Http请求。
- 执行Http请求。
总结
从上面的源码分析过程中,我们可以大致总结一下我们的服务第一次上报的整体流程:
- 使用@SentinelResource的服务,触发第一次执行后,会执行一次AOP的环绕增强。
- 在环绕增强中,会进行Sentinel资源的上报以及对资源的释放。
- 对SphU#Env对象的首次调用将会触发Sentinel一系列功能的初始化,也就是Sentinel的延迟加载。
- Sentinel的延迟加载,通过SPI模式默认加载了命令处理和心跳处理两类InitFunc。
- 命令处理用于处理所有命令,比如规则上报,或者是来自Sentinel规则平台操作的规则更新。
- 心跳处理用于维持和Sentinel控制台的心跳,心跳时间有特殊的计算逻辑,并非固定时间间隔触发。