本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫
線程池源碼中出現了很多Callable、Future、FutureTask等以前沒介紹過的接口,尤其是線程池提交任務時總是把任務封裝成FutureTask,今天就來為大家解惑:
Runnable、Callable、Future、FutureTaskFutureTask類結構FutureTask狀態執行任務 run()方法獲取任務返回值 get()方法取消任務 cancel()方法1. Runnable、Callable、Future、FutureTask
1.1 Runnable
Runnable接口只有一個run方法,而run方法的返回值是void,所以線程執行完之後沒有返回值。
public interface Runnable {
public abstract void run();
}
1.2 Callable
在很多場景下,我們通過線程來異步執行任務之後,希望獲取到任務的執行結果。比如RPC框架中,需要異步獲取任務返回值。這種情況下,Runnable無法獲取返回值就無法滿足需求了,因此Callable就出現了。
Callable也是一個接口,也只有一個call()方法,不同的是Callable的call()方法有是有返回值的,返回值的類型是一個泛型,泛型由創建Callable對象時指定。
public interface Callable<V> {
V call() throws Exception;
}
1.3 Future
要想獲得Callable的返回值就需要用到Future接口。Futrue可以監視和控制Callable任務的執行情況,如對執行結果進行取消、查詢是否完成、獲取結果等。
如:當一個任務通過線程池的submit()方法提交到線程池後,線程池會返回一個Future類型的對象,我們可以通過Future對象來獲取任務在線程池中的狀態。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
cancel方法:用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。mayInterruptIfRunning參數用來表示是否需要中斷線程,如果傳true,表示需要中斷線程,那麼就會將任務的狀態設置為INTERRUPTING;如果為false,那麼就會將任務的狀態設置為CANCELLED(關於任務的狀態INTERRUPTING和CANCELLED後面會說明)isCancelled方法:表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 trueisDone方法:表示任務是否已經完成,若任務完成,則返回trueget()方法:用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回。get(long timeout, TimeUnit unit)方法:獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null。舉例:Future獲取Callable任務的返回值
public class FutureExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newCachedThreadPool();
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "結果";
}
});
System.out.println("Callable返回值=" + future.get());
}
}
輸出結果:
Callable返回值=結果
1.4 FutureTask
FutureTask是Runnable和Future的實現類,既可以作為Runnable被線程執行,又可以作為Future得到Callable的返回值。
當線程池調用submit()方法來向線程池中提交任務時,無論提交的是Runnable類型的任務,還是提交的是Callable類型的任務,最終都是將任務封裝成一個FutureTask對象,我們可以通過這個FutureTask對象來獲取任務在線程池中的狀態。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 調用newTaskFor()將Callable任務封裝成一個FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 執行任務
execute(ftask);
return ftask;
}
// newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// 直接new一個FutureTask對象
return new FutureTask<T>(callable);
}
2. FutureTask類結構
public class FutureTask<V> implements RunnableFuture<V> {
/** state變量用來保存任務的狀態 */
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 提交的任務,Runnable類型的任務會通過Executors.callable()來轉變為Callable */
private Callable<V> callable;
/** 用來保存Callable的call()方法的返回值 */
private Object outcome;
/** 執行Callable任務的線程 **/
private volatile Thread runner;
/**
* 任務未完成時,調用get方法獲取結果的線程會阻塞等待
* waiters用於保存這些線程
*/
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}
3. FutureTask狀態
FutureTask任務的狀態如下:
// 任務的初始狀態,當新建一個FutureTask任務時,state值默認為NEW
private static final int NEW = 0;
// 任務處於完成中,也就是正在執行還未設置返回值
private static final int COMPLETING = 1;
// 任務正常被執行完成,並將任務的返回值賦值給outcome屬性之後
private static final int NORMAL = 2;
// 任務出了異常,並將異常對象賦值給outcome屬性之後
private static final int EXCEPTIONAL = 3;
// 調用cancle(false),任務被取消了
private static final int CANCELLED = 4;
// 調用cancle(true),任務中斷,但是在線程中斷之前
private static final int INTERRUPTING = 5;
// 調用cancle(true),任務中斷,但是在線程中斷之後
private static final int INTERRUPTED = 6;
狀態變化如下圖:
4. 執行任務run()
執行future.callable.call(),執行任務;執行成功,設置結果outcome;逐個喚醒waiters中的線程去獲取執行結果。
public void run() {
/*
* 1. 不是NEW狀態,不能執行
* 2. 設置runner失敗,不能執行
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();// 真正執行任務
ran = true;// 執行成功,設置執行成功標誌
} catch (Throwable ex) {
result = null;
ran = false;// 有異常,執行失敗
setException(ex);// 設置異常
}
// 如果執行成功,則設置返回結果
if (ran)
set(result);
}
} finally {
runner = null;// 無論是否執行成功,把runner設置為null
int s = state;
// 處理中斷
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* 設置執行結果
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 執行完成,設置COMPLETING狀態
outcome = v;// 設置執行結果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 設置完結果,設置NORMAL狀態
finishCompletion();// 逐個喚醒waiters中的線程去獲取執行結果
}
}
5. 獲取任務返回值get()方法
任務狀態為NORMAL,直接返回執行結果;任務狀態為COMPLETING,線程yield()讓出CPU,因為COMPLETING到NORMAL只需要很短的時間,get線程讓出CPU的短暫時間,任務狀態就是從COMPLETING變成了NORMAL;任務狀態為NEW,將get線程阻塞,如果設置了超時,阻塞至超時時間;如果沒有設置超時,會一直阻塞直到任務完成後喚醒。
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果狀態處於NEW或者COMPLETING狀態,表示任務還沒有執行完成,awaitDone()等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);// 下文詳解
// 返回結果,下文詳解
return report(s);
}
/**
* 返回執行結果
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
// 任務正常結束時,返回outcome
if (s == NORMAL)
return (V)x;
// 任務被取消了,拋出CancellationException
if (s >= CANCELLED)
throw new CancellationException();
// 這裡只能第EXCEPTIONAL狀態,表示在執行過程中出現了異常,拋出ExecutionException。
throw new ExecutionException((Throwable)x);
}
/**
* 處於NEW或者COMPLETING狀態時,get線程等待
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// ......
for (;;) {
// ......
// 任務處於COMPLETING中,就讓當前線程先暫時放棄CPU的執行權
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// ......
// 如果設置了超時,阻塞至超時時間
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 等待一段時間
LockSupport.parkNanos(this, nanos);
}
else
// 如果沒有設置超時,會一直阻塞,直到被中斷或者被喚醒
LockSupport.park(this);
}
}
6. 取消任務 cancel()
將任務狀態設置成INTERRUPTING/INTERRUPTED/CANCELLED狀態就表示取消了線程,因為在這些狀態下任務的run方法是不能執行的。
public boolean cancel(boolean mayInterruptIfRunning) {
/*
* 以下情況不能取消任務:
* 1. 當前任務不是NEW狀態,已經被執行了,不能取消
* 2. 當前任務還沒有執行,state == NEW,但是CAS設置狀態失敗,不能取消
*/
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 中斷
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();// 中斷線程
} finally { // final state
// 中斷之後,設置INTERRUPTED狀態
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();// 喚醒waiters中的線程去獲取執行結果
}
return true;
}