标准 专业
多元 极客

JDK研究院(3)——FutureTask源码分析

1 Future

FutureFutureTask最顶层的接口。

Future是一个接口,它代表着异步执行过程的结果,比如我们向线程池中提交任务,返回的就是Future类型的结果。

Future提供了一些方法用于判断执行是否完成,是否去阻塞等待完成执行。

我们可以通过get()方法获取异步执行结果,通过cancel()方法判断执行是否已经取消了。

我们还可以通过其他方法判断任务到底是正常完成了,还是取消了。

如果一个任务已经完成了,那么就代表一个终结状态,不能变为已取消。

1.1 核心方法

方法 用途 备注
isCannceled() 判断任务是否已取消 任务没有正常完成,即视为已取消
cancel() 尝试去取消任务的执行 在任务已经完成,或者任务已经取消时,会取消失败 如果取消任务成功,那么这个任务是不会继续执行的 如果mayInterruptIfRunning为true,则这个任务可能会在执行过程中中取消
isDone() 任务是否已完成 正常结束、异常、取消,都属于已完成的状态
get() 等待任务执行结果 如果没有入参,是一直阻塞等待;如果有入参,在给定的时间内进行阻塞等待

2 RunnableFuture

RunnableFuture是一个既实现了Runnable,又实现了Future的接口,官方声明为一个拥有Runnable功能的Future。

也就是既能运行任务,也能根据获取运行任务的返回结果。

具体实现来看FutureTask

3 FutureTask

FutureTask是一个可以取消的异步执行任务,使用Future来管理任务的运行状态,获取运行结果,使用通过实现的Callable或者Runnable运行实际任务。

不同于之前的AQS版本,现在仅通过CAS的方式依赖一个state成员变量,以及使用一个简单的Treiber Stack来管理等待的线程。

因为原来的AQS的情况,很有可能会出现,线程A执行Task A和Task B两个任务,我们先对Task A的Future执行中断,在一些特殊情况下,线程池可能认为Task A已经取消了,线程A已经开始执行Task B了,这个时候取消的可能是Task B,让用户对取消的任务产生误解。

JDK BUG:JDK-8016247

3.1 任务状态

状态 含义
NEW 0 创建状态
COMPLETING 1 完成中
NORMAL 2 正常完成
EXCEPTIONAL 3 执行中出现异常
CANCELLED 4 已取消
INTERRUPTING 5 执行任务的线程处于中断中
INTERRUPTED 6 执行任务的线程已中断

3.2 状态流转

  • NEW -> COMPLETING -> NORMAL,正常完成。
  • NEW -> COMPLETING -> EXCEPTIONAL,任务执行过程中发生异常。
  • NEW -> CANCELLED,任务已取消。
  • NEW -> INTERRUPTING -> INTERRUPTED,执行任务的线程发生了中断。

3.3 核心成员变量

成员变量 用途
callable 需要执行的任务,任务运行后置为null
outcome 任务执行返回的结果,或者是需要抛出的异常
runner 执行任务的线程
waiters 一个无锁栈,用于存储等待的线程

3.4 创建FutureTask

目前FutureTask支持Callable和Runnable两种任务类型。

/**
 * Callable类型
 */
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
/**
 * Runnable类型,自定义返回值
 */
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

创建Callable类型的任务,即是将成员变量callable赋值给创建的任务,同时将任务的状态修改为NEW,因为此时是不会出现并发的情况。

创建Runnable类型的任务,也是将成员变量callable赋值给创建的任务,并将任务的状态修改为NEW,不同的是,我们需要将添加的Runnable类型的任务转换为Callable类型的任务:

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

采用适配器的方式,返回一个Callable类型

private static final class RunnableAdapter<T> implements Callable<T> {
    private final Runnable task;
    private final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
    public String toString() {
        return super.toString() + "[Wrapped task = " + task + "]";
    }
}

RunnableAdapter内部实现了call()方法,在call()方法中,执行任务的run()方法,并返回自定义的返回值

3.5 核心方法

3.5.1 run()

调用FutureTask#run()方法即可执行任务。

public void run() {
    // 执行任务的前提条件
    // 任务状态为NEW
    // 可以使用当前线程执行(CAS操作成功)
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 执行任务并获取返回结果
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                // 任务执行失败,构建任务执行失败结果
                result = null;
                ran = false;
                setException(ex);
            }
            // 任务执行成功,构建任务执行成功结果
            if (ran)
                set(result);
        }
    } finally {
        // 无论执行成功或是失败,当前线程都要让出任务的所有权
        runner = null;
        // 重新读取并校验任务状态
        int s = state;
        if (s >= INTERRUPTING)
            // 任务被中断,执行任务中断流程
            handlePossibleCancellationInterrupt(s);
    }
}
  1. 只有在任务状态为NEW,或者CAS操作使用当前线程的前提条件下,才可以执行任务。
  2. 执行任务并返回任务执行结果。
  3. 执行成功,或者失败,分别构建对应的结果。
  4. 无论任务执行成功或者失败,当前线程都要让出任务的所有权。
  5. 还需要重新检查下任务的当前状态,如果任务被中断,就要做一些中断的后置处理。

3.5.2 runAndReset()

protected boolean runAndReset() {
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call();
                ran = true;
                // 无需设置返回值
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        runner = null;
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

乍一看和run()方法执行逻辑是差不多的,但是有一些细微的变化:

  1. 任务执行成功,无需设置返回值,也没有修改任务的执行状态,只有在执行出现异常,或者任务出现中断的情况下,才会对任务状态进行修改
  2. 返回结果为任务执行结果与任务状态为NEW的与运算

仔细想一想,runAndReset()其实适用于需要重复执行的任务。

说起重复执行的任务,我们会想起ScheduledThreadPoolExecutor

我们来看如何取消任务。

3.5.3 cancel()

mayInterruptIfRunning:用于判断是否可以中断线程的执行

public boolean cancel(boolean mayInterruptIfRunning) {
    // 如果任务状态不为NEW,或者CAS操作任务状态由NEW状态改为INTERRUPTING或者CANCELLED失败,视为取消任务失败
    if (!(state == NEW && STATE.compareAndSet
          (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {
        // 如果mayInterruptIfRunning为true,任务状态此时为INTERRUPTING
        // 否则任务状态为CANCELLED
        if (mayInterruptIfRunning) {
            try {
                // 在可以中断线程的情况下对线程进行中断,最后修改任务状态为INTERRUPTED
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally {
                STATE.setRelease(this, INTERRUPTED);
            }
        }
    } finally {
        // 任务进入完成状态
        finishCompletion();
    }
    return true;
}
  1. 如果任务状态不为NEW,或者将任务状态由NEW变为INTERRUPTING或CANCELLED状态失败,判定取消失败。
  2. 此时,任务状态已经被更新为INTERRUPTING或CANCELLED。
  3. 如果可以在任务执行过程中中断,则对执行任务的线程进行中断,更新任务的状态为INTERRUPTED,这一步是为了是任务到达一个最终状态。
  4. 执行任务完成的后续操作。

我们继续看任务进入完成状态后的处理。

3.6 任务完成

3.6.1 finishCompletion()

finishCompletion()用于执行任务的完成处理。

它会唤醒并移除在等待栈中所有等待的线程,调用done()方法,并且将成员变量callable置为null,减少可达性分析扫描。

private void finishCompletion() {
    // 双重自旋
    // 遍历等待线程栈中所有的等待线程
    for (WaitNode q; (q = waiters) != null;) {
        // 将等待线程栈的这块内存空间置为null
        if (WAITERS.weakCompareAndSet(this, q, null)) {
            // 自旋唤醒所有等待线程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                // 减少垃圾回收的可达性分析
                // 链表节点传递
                q.next = null;
                q = next;
            }
            break;
        }
    }

    // 执行done()方法
    done();

    // callable置为null,减少垃圾回收的可达性分析
    callable = null;
}
  1. 由于waiters是一个存放等待线程的链表,并且存在并发的可能性,所以FutureTask采用了双自旋的形式来保证唤醒所有等待的线程。
  2. 先将等待线程栈这块内存置为null,然后再自旋唤醒所有的等待的线程,链表不断的转移下一个等待线程的指针。
  3. 执行done()方法,在FutureTask中是一个空实现,相当于任务结束后的后置处理ListenableFutureTask里面实现done()方法。

3.6.2 set()

任务执行成功,我们需要构建任务执行成功结果。

protected void set(V v) {
    // 任务状态NEW -> COMPLETING,任务处于完成中状态
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        // 任务执行结果
        outcome = v;
        // 设置任务状态为NORMAL,到达任务一个终结状态
        STATE.setRelease(this, NORMAL);
        // 任务完成的后置处理
        finishCompletion();
    }
}
  1. 任务成功执行后,任务状态的流转为:NEW -> COMPLETING -> NORMAL
  2. 进行任务完成的后置处理。

3.6.3 setException()

任务执行过程中出现了异常,我们需要构建任务执行异常的结果。

protected void setException(Throwable t) {
    // 任务状态NEW -> COMPLETING,任务处于完成中状态
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        outcome = t;
        // 设置任务状态为EXCEPTIONAL,达到一个终结状态
        STATE.setRelease(this, EXCEPTIONAL);
        // 任务执行异常的后置处理
        finishCompletion();
    }
}
  1. 任务执行过程中出现异常后,任务状态的流转为:NEW -> COMPLETING -> NORMAL。
  2. 进行任务执行异常的后置处理。

3.6.4 handlePossibleCancellationInterrupt()

private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield();
}

任务处于中断状态时,会暂停执行任务的线程。

3.7 获取结果

我们继续看获取任务执行结果get()方法。

3.7.1 get()

从上文可知,我们可以通过两种方式获取任务的执行结果。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
  1. 首先获取当前任务的状态,并判断任务是否是完成中或者创建状态。
  2. 如果是,会等待任务达到终结状态。
  3. 返回任务的输出结果。
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
  1. 既然需要在给定时间内阻塞等待任务执行结果,那么时间单位是必须参数。
  2. 如果任务当前状态处于COMPLETING或者NEW,那么就在指定的时间范围内等待任务执行结果;如果超时后任务仍然处于COMPLETING或者NEW,那么就抛出TimeOutException超时异常。
  3. 走到这里任务已经处于一个终结状态,直接返回任务执行结果。

我们继续看等待任务执行完成的awaitDone()方法。

3.7.2 awaitDone()

awaitDone()有两个入参:

  • timed,是一直阻塞还是只是阻塞一段时间。
  • nanos,阻塞的时间,单位纳秒。
private int awaitDone(boolean timed, long nanos)
		throws InterruptedException {
    // startTime为0代表线程还没有挂起
	long startTime = 0L;
	WaitNode q = null;
    boolean queued = false;
    // 自旋
	for (;;) {
        int s = state;
        // 如果任务处于一个终结状态
		if (s > COMPLETING) {
			if (q != null)
				q.thread = null;
			return s;
        }
        // 如果任务处于完成中状态
		else if (s == COMPLETING)
			// 当前线程释放资源进行等待
            Thread.yield();
        // 如果线程已经被中断了
		else if (Thread.interrupted()) {
            // 从等待线程队列中移除等待线程
			removeWaiter(q);
			throw new InterruptedException();
        }
        // 如果没有等待当前任务执行完成线程
        // 每次调用get()方法,在正常情况下都会走到这里
		else if (q == null) {
            // 如果在需要时间段内返回任务执行结果的情况下,等待时间为一个异常的数字,直接返回当前执行的状态
			if (timed && nanos <= 0L)
                return s;
            // 创建一个等待线程的节点,就是把当前线程当做一个等待线程,放入到等待队列中
			q = new WaitNode();
        }
        // 在当前执行线程没有进入等待队列的情况下,尝试入队
		else if (!queued)
            queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
        // 如果需要在指定时间内返回任务执行结果
		else if (timed) {
            final long parkNanos;
			if (startTime == 0L) {
                // 当前JVM运行时间的纳秒标识
				startTime = System.nanoTime();
				if (startTime == 0L)
					startTime = 1L;
				parkNanos = nanos;
			} else {
                // 执行当前线程已经阻塞的时间
                long elapsed = System.nanoTime() - startTime;
                // 如果已经超过指定的时间,删除当前等待的线程,返回任务当前状态
				if (elapsed >= nanos) {
					removeWaiter(q);
					return state;
                }
                // 没有超时的情况下,计算新的阻塞时间
				parkNanos = nanos - elapsed;
			}
			// 如果当前任务处于创建状态,那么就挂起当前等待线程,等待任务执行
			if (state < COMPLETING)
				LockSupport.parkNanos(this, parkNanos);
		}
        else
            // 走到这里证明没有超时时间的约束,FutureTask会一直挂起线程
            // 由于AQS的特点,任务执行完成后,调用unpark就会唤醒当前等待的线程
			LockSupport.park(this);
	}
}
  1. 我们会把阻塞获取任务执行结果的线程,当做一个等待线程,放入等待线程栈中,在正常情况下,我们都会创建一个等待线程。
  2. 如果有阻塞时间的限制,在每次自旋时,我们都会计算一次阻塞的时间,并实时更新等待线程挂起的时间
  3. 如果没有阻塞时间的限制,我们会一直阻塞等待线程,直到唤醒。

什么时候会唤醒等待任务执行的线程呢?

finishCompletion()中,会调用LockSupport.unpart()

所以,在以下情况下会在awaitDone()的情况下跳出自旋:

  1. 在指定阻塞时间范围内,完成了任务执行。
  2. 等待任务执行完成的线程已经被中断,抛出了线程中断的异常。
  3. 阻塞时间是一个异常数字。
  4. 超过了阻塞时间

3.7.3 report()

FutureTask#report()用于返回执行任务的结果,可能是一个值,也可能是异常。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

此时任务状态经由上面的流转,必大于COMPLETING

  1. 任务状态为NORMAL,返回任务执行成功返回的值。
  2. 如果任务状态为CANCELEDINTERRUPTINGINTERRUPTED,抛出CancellationException,视为任务已取消。
  3. 此时,任务状态仅能为EXCEPTIONAL,在执行任务过程中出现了异常,此时执行结果为Throwable类型,故包装为ExecutionException()后抛出。

3.8 WaiterNode

WaiterNode是用于存储等待线程,Thread即为等待线程,next为指向下一个等待节点的指针。

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

创建WaitNode时,WaitNode会直接把当前线程作为等待线程放入到等待线程栈中。

3.8.1 removeWaiter()

我们在创建WaitNode时,会操作把WaitNodenext指针,指向成员变量waiters,完成入栈的操作。

在等待线程无法继续等待下去时,FutureTask提供了removeWaiter()方法,将等待线程从waiters中删除。

private void removeWaiter(WaitNode node) {
    if (node != null) {
        // 首先,将等待线程的引用去掉
        // 相当于一个标识,标识thread = null的线程将会被清除掉
        node.thread = null;
        retry:
        // 自旋
        for (;;) {
            // 遍历waiters栈,找到对应的waitNode,进行节点替换
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next; // null
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null)
                        continue retry;
                }
                else if (!WAITERS.compareAndSet(this, q, s))
                    continue retry;
            }
            break;
        }
    }
}
  1. 首先将要删除节点的thread标识为null,在遍历过程中,如果发现节点thread为null的情况,就会删除该节点。
  2. 在没有从头到尾遍历一次等待线程栈的情况下,会一直自旋删除节点。
  3. 然后经遍历,找到对应要删除的节点,进行移除。

我们继续看下移除节点的情景:

  1. 在栈的结构为waitNodeOne(thread=threadOne) -> waitNodeTwo(thread=null) -> waitNodeThree(thread=threadThree)时:① 第一次循环,q = waitNodeOne; s = waitNodeTwo; pred = waitNodeOne② 第二次循环,q = waitNodeTwo; s = waitNodeThree; pred = waitNodeOne; pred.next = waitNodeThree
  2. 在栈的结构为waitNodeOne(thread=null) -> waitNodeTwo(thread=threadTwo)时,q=waitNodeOnes=waitNodeTwo,这种栈顶为指定删除元素时,CAS操作修改waiters为waitNodeTwo
  3. 如果CAS操作失败,证明有其他线程了waiters,可能已经被打乱,就需要重新进行一次自旋遍历。

4 总结

FutureTask实现了RunnableTask,RunnableTask同时实现了Runnable和Future,FutureTask作为一个可向线程池中提交的可执行任务,包括了任务执行,任务状态流转,任务执行结果组装,等待获取结果的线程唤醒等功能。

赞(2) 投币

评论 抢沙发

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫