1 Future
Future是FutureTask最顶层的接口。
Future是一个接口,它代表着异步执行过程的结果,比如我们向线程池中提交任务,返回的就是Future类型的结果。
Future提供了一些方法用于判断执行是否完成,是否去阻塞等待完成执行。
我们可以通过get()方法获取异步执行结果,通过cancel()方法判断执行是否已经取消了。
我们还可以通过其他方法判断任务到底是正常完成了,还是取消了。
如果一个任务已经完成了,那么就代表一个终结状态,不能变为已取消。
1.1 核心方法
方法 | 用途 | 备注 |
---|---|---|
isCannceled() | 判断任务是否已取消 | 任务没有正常完成,即视为已取消 |
cancel() | 尝试去取消任务的执行 | 在任务已经完成,或者任务已经取消时,会取消失败 如果取消任务成功,那么这个任务是不会继续执行的 如果mayInterruptIfRunning为true,则这个任务可能会在执行过程中中取消 |
isDone() | 任务是否已完成 | 正常结束、异常、取消,都属于已完成的状态 |
get() | 等待任务执行结果 | 如果没有入参,是一直阻塞等待;如果有入参,在给定的时间内进行阻塞等待 |
2 RunnableFuture
RunnableFuture是一个既实现了Runnable,又实现了Future的接口,官方声明为一个拥有Runnable功能的Future。
也就是既能运行任务,也能根据获取运行任务的返回结果。
具体实现来看FutureTask。
3 FutureTask
FutureTask是一个可以取消的异步执行任务,使用Future来管理任务的运行状态,获取运行结果,使用通过实现的Callable或者Runnable运行实际任务。
不同于之前的AQS版本,现在仅通过CAS的方式依赖一个state成员变量,以及使用一个简单的Treiber Stack来管理等待的线程。
因为原来的AQS的情况,很有可能会出现,线程A执行Task A和Task B两个任务,我们先对Task A的Future执行中断,在一些特殊情况下,线程池可能认为Task A已经取消了,线程A已经开始执行Task B了,这个时候取消的可能是Task B,让用户对取消的任务产生误解。
JDK BUG:JDK-8016247
3.1 任务状态
状态 | 值 | 含义 |
---|---|---|
NEW | 0 | 创建状态 |
COMPLETING | 1 | 完成中 |
NORMAL | 2 | 正常完成 |
EXCEPTIONAL | 3 | 执行中出现异常 |
CANCELLED | 4 | 已取消 |
INTERRUPTING | 5 | 执行任务的线程处于中断中 |
INTERRUPTED | 6 | 执行任务的线程已中断 |
3.2 状态流转
- NEW -> COMPLETING -> NORMAL,正常完成。
- NEW -> COMPLETING -> EXCEPTIONAL,任务执行过程中发生异常。
- NEW -> CANCELLED,任务已取消。
- NEW -> INTERRUPTING -> INTERRUPTED,执行任务的线程发生了中断。
3.3 核心成员变量
成员变量 | 用途 |
---|---|
callable | 需要执行的任务,任务运行后置为null |
outcome | 任务执行返回的结果,或者是需要抛出的异常 |
runner | 执行任务的线程 |
waiters | 一个无锁栈,用于存储等待的线程 |
3.4 创建FutureTask
目前FutureTask支持Callable和Runnable两种任务类型。
/**
* Callable类型
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* Runnable类型,自定义返回值
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
创建Callable类型的任务,即是将成员变量callable赋值给创建的任务,同时将任务的状态修改为NEW,因为此时是不会出现并发的情况。
创建Runnable类型的任务,也是将成员变量callable赋值给创建的任务,并将任务的状态修改为NEW,不同的是,我们需要将添加的Runnable类型的任务转换为Callable类型的任务:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
采用适配器的方式,返回一个Callable类型:
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}
RunnableAdapter内部实现了call()方法,在call()方法中,执行任务的run()方法,并返回自定义的返回值。
3.5 核心方法
3.5.1 run()
调用FutureTask#run()方法即可执行任务。
public void run() {
// 执行任务的前提条件
// 任务状态为NEW
// 可以使用当前线程执行(CAS操作成功)
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务并获取返回结果
result = c.call();
ran = true;
} catch (Throwable ex) {
// 任务执行失败,构建任务执行失败结果
result = null;
ran = false;
setException(ex);
}
// 任务执行成功,构建任务执行成功结果
if (ran)
set(result);
}
} finally {
// 无论执行成功或是失败,当前线程都要让出任务的所有权
runner = null;
// 重新读取并校验任务状态
int s = state;
if (s >= INTERRUPTING)
// 任务被中断,执行任务中断流程
handlePossibleCancellationInterrupt(s);
}
}
- 只有在任务状态为NEW,或者CAS操作使用当前线程的前提条件下,才可以执行任务。
- 执行任务并返回任务执行结果。
- 执行成功,或者失败,分别构建对应的结果。
- 无论任务执行成功或者失败,当前线程都要让出任务的所有权。
- 还需要重新检查下任务的当前状态,如果任务被中断,就要做一些中断的后置处理。
3.5.2 runAndReset()
protected boolean runAndReset() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call();
ran = true;
// 无需设置返回值
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
乍一看和run()方法执行逻辑是差不多的,但是有一些细微的变化:
- 任务执行成功,无需设置返回值,也没有修改任务的执行状态,只有在执行出现异常,或者任务出现中断的情况下,才会对任务状态进行修改。
- 返回结果为任务执行结果与任务状态为NEW的与运算。
仔细想一想,runAndReset()其实适用于需要重复执行的任务。
说起重复执行的任务,我们会想起ScheduledThreadPoolExecutor。
我们来看如何取消任务。
3.5.3 cancel()
mayInterruptIfRunning:用于判断是否可以中断线程的执行
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果任务状态不为NEW,或者CAS操作任务状态由NEW状态改为INTERRUPTING或者CANCELLED失败,视为取消任务失败
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// 如果mayInterruptIfRunning为true,任务状态此时为INTERRUPTING
// 否则任务状态为CANCELLED
if (mayInterruptIfRunning) {
try {
// 在可以中断线程的情况下对线程进行中断,最后修改任务状态为INTERRUPTED
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
// 任务进入完成状态
finishCompletion();
}
return true;
}
- 如果任务状态不为NEW,或者将任务状态由NEW变为INTERRUPTING或CANCELLED状态失败,判定取消失败。
- 此时,任务状态已经被更新为INTERRUPTING或CANCELLED。
- 如果可以在任务执行过程中中断,则对执行任务的线程进行中断,更新任务的状态为INTERRUPTED,这一步是为了是任务到达一个最终状态。
- 执行任务完成的后续操作。
我们继续看任务进入完成状态后的处理。
3.6 任务完成
3.6.1 finishCompletion()
finishCompletion()用于执行任务的完成处理。
它会唤醒并移除在等待栈中所有等待的线程,调用done()方法,并且将成员变量callable置为null,减少可达性分析扫描。
private void finishCompletion() {
// 双重自旋
// 遍历等待线程栈中所有的等待线程
for (WaitNode q; (q = waiters) != null;) {
// 将等待线程栈的这块内存空间置为null
if (WAITERS.weakCompareAndSet(this, q, null)) {
// 自旋唤醒所有等待线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
// 减少垃圾回收的可达性分析
// 链表节点传递
q.next = null;
q = next;
}
break;
}
}
// 执行done()方法
done();
// callable置为null,减少垃圾回收的可达性分析
callable = null;
}
- 由于waiters是一个存放等待线程的链表,并且存在并发的可能性,所以FutureTask采用了双自旋的形式来保证唤醒所有等待的线程。
- 先将等待线程栈这块内存置为null,然后再自旋唤醒所有的等待的线程,链表不断的转移下一个等待线程的指针。
- 执行done()方法,在FutureTask中是一个空实现,相当于任务结束后的后置处理,ListenableFutureTask里面实现done()方法。
3.6.2 set()
任务执行成功,我们需要构建任务执行成功结果。
protected void set(V v) {
// 任务状态NEW -> COMPLETING,任务处于完成中状态
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 任务执行结果
outcome = v;
// 设置任务状态为NORMAL,到达任务一个终结状态
STATE.setRelease(this, NORMAL);
// 任务完成的后置处理
finishCompletion();
}
}
- 任务成功执行后,任务状态的流转为:NEW -> COMPLETING -> NORMAL。
- 进行任务完成的后置处理。
3.6.3 setException()
任务执行过程中出现了异常,我们需要构建任务执行异常的结果。
protected void setException(Throwable t) {
// 任务状态NEW -> COMPLETING,任务处于完成中状态
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = t;
// 设置任务状态为EXCEPTIONAL,达到一个终结状态
STATE.setRelease(this, EXCEPTIONAL);
// 任务执行异常的后置处理
finishCompletion();
}
}
- 任务执行过程中出现异常后,任务状态的流转为:NEW -> COMPLETING -> NORMAL。
- 进行任务执行异常的后置处理。
3.6.4 handlePossibleCancellationInterrupt()
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}
任务处于中断状态时,会暂停执行任务的线程。
3.7 获取结果
我们继续看获取任务执行结果get()方法。
3.7.1 get()
从上文可知,我们可以通过两种方式获取任务的执行结果。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
- 首先获取当前任务的状态,并判断任务是否是完成中或者创建状态。
- 如果是,会等待任务达到终结状态。
- 返回任务的输出结果。
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
- 既然需要在给定时间内阻塞等待任务执行结果,那么时间单位是必须参数。
- 如果任务当前状态处于COMPLETING或者NEW,那么就在指定的时间范围内等待任务执行结果;如果超时后任务仍然处于COMPLETING或者NEW,那么就抛出TimeOutException超时异常。
- 走到这里任务已经处于一个终结状态,直接返回任务执行结果。
我们继续看等待任务执行完成的awaitDone()方法。
3.7.2 awaitDone()
awaitDone()有两个入参:
- timed,是一直阻塞还是只是阻塞一段时间。
- nanos,阻塞的时间,单位纳秒。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// startTime为0代表线程还没有挂起
long startTime = 0L;
WaitNode q = null;
boolean queued = false;
// 自旋
for (;;) {
int s = state;
// 如果任务处于一个终结状态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 如果任务处于完成中状态
else if (s == COMPLETING)
// 当前线程释放资源进行等待
Thread.yield();
// 如果线程已经被中断了
else if (Thread.interrupted()) {
// 从等待线程队列中移除等待线程
removeWaiter(q);
throw new InterruptedException();
}
// 如果没有等待当前任务执行完成线程
// 每次调用get()方法,在正常情况下都会走到这里
else if (q == null) {
// 如果在需要时间段内返回任务执行结果的情况下,等待时间为一个异常的数字,直接返回当前执行的状态
if (timed && nanos <= 0L)
return s;
// 创建一个等待线程的节点,就是把当前线程当做一个等待线程,放入到等待队列中
q = new WaitNode();
}
// 在当前执行线程没有进入等待队列的情况下,尝试入队
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
// 如果需要在指定时间内返回任务执行结果
else if (timed) {
final long parkNanos;
if (startTime == 0L) {
// 当前JVM运行时间的纳秒标识
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
// 执行当前线程已经阻塞的时间
long elapsed = System.nanoTime() - startTime;
// 如果已经超过指定的时间,删除当前等待的线程,返回任务当前状态
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
// 没有超时的情况下,计算新的阻塞时间
parkNanos = nanos - elapsed;
}
// 如果当前任务处于创建状态,那么就挂起当前等待线程,等待任务执行
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
// 走到这里证明没有超时时间的约束,FutureTask会一直挂起线程
// 由于AQS的特点,任务执行完成后,调用unpark就会唤醒当前等待的线程
LockSupport.park(this);
}
}
- 我们会把阻塞获取任务执行结果的线程,当做一个等待线程,放入等待线程栈中,在正常情况下,我们都会创建一个等待线程。
- 如果有阻塞时间的限制,在每次自旋时,我们都会计算一次阻塞的时间,并实时更新等待线程挂起的时间。
- 如果没有阻塞时间的限制,我们会一直阻塞等待线程,直到唤醒。
什么时候会唤醒等待任务执行的线程呢?
在finishCompletion()中,会调用LockSupport.unpart()。
所以,在以下情况下会在awaitDone()的情况下跳出自旋:
- 在指定阻塞时间范围内,完成了任务执行。
- 等待任务执行完成的线程已经被中断,抛出了线程中断的异常。
- 阻塞时间是一个异常数字。
- 超过了阻塞时间
3.7.3 report()
FutureTask#report()用于返回执行任务的结果,可能是一个值,也可能是异常。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
此时任务状态经由上面的流转,必大于COMPLETING。
- 任务状态为NORMAL,返回任务执行成功返回的值。
- 如果任务状态为CANCELED、INTERRUPTING、INTERRUPTED,抛出CancellationException,视为任务已取消。
- 此时,任务状态仅能为EXCEPTIONAL,在执行任务过程中出现了异常,此时执行结果为Throwable类型,故包装为ExecutionException()后抛出。
3.8 WaiterNode
WaiterNode是用于存储等待线程,Thread即为等待线程,next为指向下一个等待节点的指针。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
创建WaitNode时,WaitNode会直接把当前线程作为等待线程放入到等待线程栈中。
3.8.1 removeWaiter()
我们在创建WaitNode时,会操作把WaitNode的next指针,指向成员变量waiters,完成入栈的操作。
在等待线程无法继续等待下去时,FutureTask提供了removeWaiter()方法,将等待线程从waiters中删除。
private void removeWaiter(WaitNode node) {
if (node != null) {
// 首先,将等待线程的引用去掉
// 相当于一个标识,标识thread = null的线程将会被清除掉
node.thread = null;
retry:
// 自旋
for (;;) {
// 遍历waiters栈,找到对应的waitNode,进行节点替换
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next; // null
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null)
continue retry;
}
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}
- 首先将要删除节点的thread标识为null,在遍历过程中,如果发现节点thread为null的情况,就会删除该节点。
- 在没有从头到尾遍历一次等待线程栈的情况下,会一直自旋删除节点。
- 然后经遍历,找到对应要删除的节点,进行移除。
我们继续看下移除节点的情景:
- 在栈的结构为waitNodeOne(thread=threadOne) -> waitNodeTwo(thread=null) -> waitNodeThree(thread=threadThree)时:① 第一次循环,q = waitNodeOne; s = waitNodeTwo; pred = waitNodeOne② 第二次循环,q = waitNodeTwo; s = waitNodeThree; pred = waitNodeOne; pred.next = waitNodeThree
- 在栈的结构为waitNodeOne(thread=null) -> waitNodeTwo(thread=threadTwo)时,q=waitNodeOne,s=waitNodeTwo,这种栈顶为指定删除元素时,CAS操作修改waiters为waitNodeTwo。
- 如果CAS操作失败,证明有其他线程了waiters,可能已经被打乱,就需要重新进行一次自旋遍历。
4 总结
FutureTask实现了RunnableTask,RunnableTask同时实现了Runnable和Future,FutureTask作为一个可向线程池中提交的可执行任务,包括了任务执行,任务状态流转,任务执行结果组装,等待获取结果的线程唤醒等功能。