Future 详解

Catalogue
  1. 1. Overview
  2. 2. TASK STATE
  3. 3. Get 方法
  4. 4. 参考资料

Overview

1
2
3
4
5
6
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

Future 是一个还算是见的接口,我们看看究竟是怎么实现的。它有多个实现类,我们看下最为重要的

java.util.concurrent.FutureTask

TASK STATE

我们可以看见这个类最上面就是

1
2
3
4
5
6
7
8
private volatile int state;
private static final int NEW = 0; //新建
private static final int COMPLETING = 1; //结束中
private static final int NORMAL = 2; //正常结束
private static final int EXCEPTIONAL = 3; //异常结束
private static final int CANCELLED = 4; //取消
private static final int INTERRUPTING = 5; //中断中
private static final int INTERRUPTED = 6; //已中断

这里其实就是这个任务所能够转换的状态,这里的单词都很容易理解,我也添加了备注。在代码的备注中有一个很重要的说明

状态的变化可以是这样的:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

Get 方法

我们来看看最常用的Get方法。

1
2
3
4
5
6
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

代码很简单,也就是获得当前的状态,我们判断状态如果是 COMPLETING 或者 NEW (这里用 <= )我们就去等待,其他的情况直接就返回结果。见report函数。

1
2
3
4
5
6
7
8
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

这里也就是根据返回的结果,如果是正常结束就返回,其他就返回异常。我们再来看看 awaitDone 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L; //➀
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) { //➁
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) { //➂
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) //➃ cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos); //➄
}
else
LockSupport.park(this); //➅
}
}

这里的代码就复杂点了,我们从上到下慢慢看起来。
➀ 这里如果有超时时间的话,就是一个值,其他的就是0
➁ 这里如果运行的线程已经中断了,那我们就直接抛出异常就好了,最简单的部分。
➂ 如果结束了,我们需要返回状态即可。
➃ 因为我们的任务已经完成了,只是在等待结果导出,所以这里不能再计算超时,我们直接把线程让出来就行。
➄ 这里也就是将自己Blocking住需要超时的时间。
➅ 如果没有超时的设置,我们就将自己一直Blocking住

那问题又来了,我们什么时候才能够将自己唤醒呢?秘密就在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) { //➀
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null) //➁
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}

➀ 在这里我们释放在awaitDone被Blocking的线程,这个时候继续运行下去,如果结果还是没有又会在 awaitDone 中新创建一个 WaitNode 然后被在awaitDone中被释放,直到 ➁ 完全没有任务可言的时候就算是结束。

这里有点绕的原因是因为 ➀ awaitDone 的循环中如果任务没有完成 -> ➁ awaitDone 的当前线程(主线程)被 Blocking -> ➂ finishCompletion 的运行线程(也就是任务的本体)如果任务还没有完成 -> ➃ 唤醒 awaitDone的线程(主线程) -> ➀

参考资料