「原創」Java並發編程系列36|FutureTask

2020-12-10 酷扯兒

本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫

線程池源碼中出現了很多Callable、Future、FutureTask等以前沒介紹過的接口,尤其是線程池提交任務時總是把任務封裝成FutureTask,今天就來為大家解惑:

Runnable、Callable、Future、FutureTaskFutureTask類結構FutureTask狀態執行任務 run()方法獲取任務返回值 get()方法取消任務 cancel()方法1. Runnable、Callable、Future、FutureTask

1.1 Runnable

Runnable接口只有一個run方法,而run方法的返回值是void,所以線程執行完之後沒有返回值。

public interface Runnable {

public abstract void run();

}

1.2 Callable

在很多場景下,我們通過線程來異步執行任務之後,希望獲取到任務的執行結果。比如RPC框架中,需要異步獲取任務返回值。這種情況下,Runnable無法獲取返回值就無法滿足需求了,因此Callable就出現了。

Callable也是一個接口,也只有一個call()方法,不同的是Callable的call()方法有是有返回值的,返回值的類型是一個泛型,泛型由創建Callable對象時指定。

public interface Callable<V> {

V call() throws Exception;

}

1.3 Future

要想獲得Callable的返回值就需要用到Future接口。Futrue可以監視和控制Callable任務的執行情況,如對執行結果進行取消、查詢是否完成、獲取結果等。

如:當一個任務通過線程池的submit()方法提交到線程池後,線程池會返回一個Future類型的對象,我們可以通過Future對象來獲取任務在線程池中的狀態。

public interface Future<V> {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

cancel方法:用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。mayInterruptIfRunning參數用來表示是否需要中斷線程,如果傳true,表示需要中斷線程,那麼就會將任務的狀態設置為INTERRUPTING;如果為false,那麼就會將任務的狀態設置為CANCELLED(關於任務的狀態INTERRUPTING和CANCELLED後面會說明)isCancelled方法:表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 trueisDone方法:表示任務是否已經完成,若任務完成,則返回trueget()方法:用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回。get(long timeout, TimeUnit unit)方法:獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null。舉例:Future獲取Callable任務的返回值

public class FutureExample {

public static void main(String[] args) throws InterruptedException, ExecutionException {

ExecutorService threadPool = Executors.newCachedThreadPool();

Future<String> future = threadPool.submit(new Callable<String>() {

@Override

public String call() throws Exception {

Thread.sleep(2000);

return "結果";

}

});

System.out.println("Callable返回值=" + future.get());

}

}

輸出結果:

Callable返回值=結果

1.4 FutureTask

FutureTask是Runnable和Future的實現類,既可以作為Runnable被線程執行,又可以作為Future得到Callable的返回值。

當線程池調用submit()方法來向線程池中提交任務時,無論提交的是Runnable類型的任務,還是提交的是Callable類型的任務,最終都是將任務封裝成一個FutureTask對象,我們可以通過這個FutureTask對象來獲取任務在線程池中的狀態。

public <T> Future<T> submit(Callable<T> task) {

if (task == null) throw new NullPointerException();

// 調用newTaskFor()將Callable任務封裝成一個FutureTask

RunnableFuture<T> ftask = newTaskFor(task);

// 執行任務

execute(ftask);

return ftask;

}

// newTaskFor

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {

// 直接new一個FutureTask對象

return new FutureTask<T>(callable);

}

2. FutureTask類結構

public class FutureTask<V> implements RunnableFuture<V> {

/** state變量用來保存任務的狀態 */

private volatile int state;

private static final int NEW = 0;

private static final int COMPLETING = 1;

private static final int NORMAL = 2;

private static final int EXCEPTIONAL = 3;

private static final int CANCELLED = 4;

private static final int INTERRUPTING = 5;

private static final int INTERRUPTED = 6;

/** 提交的任務,Runnable類型的任務會通過Executors.callable()來轉變為Callable */

private Callable<V> callable;

/** 用來保存Callable的call()方法的返回值 */

private Object outcome;

/** 執行Callable任務的線程 **/

private volatile Thread runner;

/**

* 任務未完成時,調用get方法獲取結果的線程會阻塞等待

* waiters用於保存這些線程

*/

private volatile WaitNode waiters;

static final class WaitNode {

volatile Thread thread;

volatile WaitNode next;

WaitNode() { thread = Thread.currentThread(); }

}

}

3. FutureTask狀態

FutureTask任務的狀態如下:

// 任務的初始狀態,當新建一個FutureTask任務時,state值默認為NEW

private static final int NEW = 0;

// 任務處於完成中,也就是正在執行還未設置返回值

private static final int COMPLETING = 1;

// 任務正常被執行完成,並將任務的返回值賦值給outcome屬性之後

private static final int NORMAL = 2;

// 任務出了異常,並將異常對象賦值給outcome屬性之後

private static final int EXCEPTIONAL = 3;

// 調用cancle(false),任務被取消了

private static final int CANCELLED = 4;

// 調用cancle(true),任務中斷,但是在線程中斷之前

private static final int INTERRUPTING = 5;

// 調用cancle(true),任務中斷,但是在線程中斷之後

private static final int INTERRUPTED = 6;

狀態變化如下圖:

4. 執行任務run()

執行future.callable.call(),執行任務;執行成功,設置結果outcome;逐個喚醒waiters中的線程去獲取執行結果。

public void run() {

/*

* 1. 不是NEW狀態,不能執行

* 2. 設置runner失敗,不能執行

*/

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return;

try {

Callable<V> c = callable;

if (c != null && state == NEW) {

V result;

boolean ran;

try {

result = c.call();// 真正執行任務

ran = true;// 執行成功,設置執行成功標誌

} catch (Throwable ex) {

result = null;

ran = false;// 有異常,執行失敗

setException(ex);// 設置異常

}

// 如果執行成功,則設置返回結果

if (ran)

set(result);

}

} finally {

runner = null;// 無論是否執行成功,把runner設置為null

int s = state;

// 處理中斷

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

}

/**

* 設置執行結果

*/

protected void set(V v) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 執行完成,設置COMPLETING狀態

outcome = v;// 設置執行結果

UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 設置完結果,設置NORMAL狀態

finishCompletion();// 逐個喚醒waiters中的線程去獲取執行結果

}

}

5. 獲取任務返回值get()方法

任務狀態為NORMAL,直接返回執行結果;任務狀態為COMPLETING,線程yield()讓出CPU,因為COMPLETING到NORMAL只需要很短的時間,get線程讓出CPU的短暫時間,任務狀態就是從COMPLETING變成了NORMAL;任務狀態為NEW,將get線程阻塞,如果設置了超時,阻塞至超時時間;如果沒有設置超時,會一直阻塞直到任務完成後喚醒。

public V get() throws InterruptedException, ExecutionException {

int s = state;

// 如果狀態處於NEW或者COMPLETING狀態,表示任務還沒有執行完成,awaitDone()等待

if (s <= COMPLETING)

s = awaitDone(false, 0L);// 下文詳解

// 返回結果,下文詳解

return report(s);

}

/**

* 返回執行結果

*/

private V report(int s) throws ExecutionException {

Object x = outcome;

// 任務正常結束時,返回outcome

if (s == NORMAL)

return (V)x;

// 任務被取消了,拋出CancellationException

if (s >= CANCELLED)

throw new CancellationException();

// 這裡只能第EXCEPTIONAL狀態,表示在執行過程中出現了異常,拋出ExecutionException。

throw new ExecutionException((Throwable)x);

}

/**

* 處於NEW或者COMPLETING狀態時,get線程等待

*/

private int awaitDone(boolean timed, long nanos)

throws InterruptedException {

// ......

for (;;) {

// ......

// 任務處於COMPLETING中,就讓當前線程先暫時放棄CPU的執行權

else if (s == COMPLETING) // cannot time out yet

Thread.yield();

// ......

// 如果設置了超時,阻塞至超時時間

else if (timed) {

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

removeWaiter(q);

return state;

}

// 等待一段時間

LockSupport.parkNanos(this, nanos);

}

else

// 如果沒有設置超時,會一直阻塞,直到被中斷或者被喚醒

LockSupport.park(this);

}

}

6. 取消任務 cancel()

將任務狀態設置成INTERRUPTING/INTERRUPTED/CANCELLED狀態就表示取消了線程,因為在這些狀態下任務的run方法是不能執行的。

public boolean cancel(boolean mayInterruptIfRunning) {

/*

* 以下情況不能取消任務:

* 1. 當前任務不是NEW狀態,已經被執行了,不能取消

* 2. 當前任務還沒有執行,state == NEW,但是CAS設置狀態失敗,不能取消

*/

if (!(state == NEW &&

UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

return false;

try { // in case call to interrupt throws exception

// 中斷

if (mayInterruptIfRunning) {

try {

Thread t = runner;

if (t != null)

t.interrupt();// 中斷線程

} finally { // final state

// 中斷之後,設置INTERRUPTED狀態

UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

}

}

} finally {

finishCompletion();// 喚醒waiters中的線程去獲取執行結果

}

return true;

}

相關焦點

  • 原創】Java並發編程系列01|開篇獲獎感言
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫為什麼要學並發編程我曾聽一個從事15年開發工作的技術人員說過,他剛工作時的並發編程第一原則就是不要寫並發程序。
  • 「原創」Java並發編程系列09|基礎乾貨
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第9篇。現在,我們進入正題:介紹並發編程的基礎性概念。
  • 「原創」Java並發編程系列02|並發編程三大核心問題
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫寫在前面編寫並發程序是比較困難的,因為並發程序極易出現Bug,這些Bug有都是比較詭異的,很多都是沒辦法追蹤,而且難以復現。
  • 「原創」Java並發編程系列33|深入理解線程池(上)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫並發編程必不可少的線程池,接下來分兩篇文章介紹線程池,本文是第一篇。介紹1.1 使用場景並發編程可以高效利用CPU資源,提升任務執行效率,但是多線程及線程間的切換也伴隨著資源的消耗。當遇到單個任務處理時間比較短,但需要處理的任務數量很大時,線程會頻繁的創建銷毀,大量的時間和資源都會浪費在線程的創建和銷毀上,效率很低。
  • 「原創」Java並發編程系列28|Copy-On-Write容器
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫正文前面兩篇講了並發編程中線程安全HashMap:ConcurrentHashMap
  • 「原創」Java並發編程系列18|讀寫鎖(下)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 18 篇,文末有本系列文章匯總。上篇為【原創】Java並發編程系列17 | 讀寫鎖八講(上),沒看過的可以先看看。本文是下篇,從「源碼分析寫鎖的獲取與釋放」開始。7.
  • 「原創」Java並發編程系列06|你不知道的final
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫final在Java中是一個保留的關鍵字,可以修飾變量、方法和類。那麼fianl在並發編程中有什麼作用呢?本文就在對final常見應用總結基礎上,講解final並發編程中的應用。
  • 「原創」Java並發編程系列29|ConcurrentLinkedQueue
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫J.U.C 為常用的集合提供了並發安全的版本,前面講解了 map 的並發安全集合 ConcurrentHashMap,List 並發安全集合 CopyOnWriteArrayList,Set 並發安全集合
  • 「原創」Java並發編程系列14|AQS源碼分析
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 14 篇,文末有本系列文章匯總。AbstractQueuedSynchronizer是Java並發包java.util.concurrent的核心基礎組件,是實現Lock的基礎。
  • JAVA並發編程:並發問題的根源及主要解決方法
    而在java中,不變性變量即通過final修飾的變量,如String,Long,Double等類型都是Immutability的,它們的內部實現都是基於final關鍵字的。那這又和並發編程有什麼關係呢?其實啊,並發問題很大部分原因就是因為線程切換破壞了原子性,這又導致線程隨意對變量的讀寫破壞了數據的一致性。
  • Python 3.8異步並發編程
    有效的提高程序執行效率的兩種方法是異步和並發,Golang,node.js之所以可以有很高執行效率主要是他們的協程和異步並發機制。實際上異步和並發是每一種現代語言都在追求的特性,當然Python也不例外,今天我們就講講Python 3中的異步並發編程。
  • 「原創」Java並發編程系列03|重排序-可見性和有序性問題根源
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫思維導圖寫在前面並發編程的三大問題:原子性、可見性、有序性。從java原始碼到最終實際執行的指令序列,會分別經歷下面三種重排序:編譯器優化的重排序。編譯器在不改變單線程程序語義的前提下,可以重新安排語句的執行順序。指令級並行的重排序。
  • NO.001- 簡說 Java 並發編程史
    這篇文章是Java並發編程思想系列的第一篇,主要從理解Java並發編程歷史的原因和Java並發演進過程兩部分,以極簡地回溯並發編程的歷史,幫助大家從歷史這個角度去了解一門語言一個特性的演進。對歷史理解的越多,思考的越多,未來的方向就會更加堅定。我是誰?從哪來?到哪去?
  • Java並發編程系列20|StampedLock源碼解析
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 20 篇,文末有本系列文章匯總。上一篇介紹了StampedLock存在的意義以及如何使用,按照這個系列的風格大家也應該猜到了,這一篇就是的源碼分析。
  • Java的synchronized 能防止指令重排序嗎?
    「二胖」:好吧,你既然這麼好奇,那我就大概說下吧,你搬上小板凳仔細挺好了哦。我要開始我的表演了。下面二胖第一面開始了。「面試官」:二胖是吧,先做個自我介紹吧。「二胖」:好的,我叫二胖,我來自長沙,今年25歲,從事java開發快3年了,現在在XX公司XX事業部擔任高級「java」開發工程師,主要負責XX系統。。。。。
  • Java並發編程系列23|循環屏障CyclicBarrier
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本篇介紹第二個並發工具類CyclicBarrier,CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier),分以下部分介紹:
  • 動力節點Java學院2021年Java學習路線圖最新出爐啦
    Java在程式語言排行榜中一直牢牢佔據榜首位置,幾乎所有的大中型網際網路的應用系統在伺服器端開發首選都是Java編程,正因如何吸引這不少年輕人投入該行業,Java雖不想其它程式語言那麼複雜,但是知識體系還是很龐大的,因此想要學好並非容易之事,不少想要跨入Java編程行業的同學們通過網絡搜索各式各樣的學習資料
  • Java並發編程系列21|Condition-Lock的等待通知
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫我們知道 synchronized 鎖通過 Object 類的 wait()和 notify()方法實現線程間的等待通知機制,而比 synchronized 更靈活 Lock 鎖同樣也有實現等待通知機制的方式
  • 「原創」JVM系列03|Java棧—方法是如何調用的?
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文是何適 JVM 修仙系列第 3 篇,文末有本系列文章匯總。 e.printStackTrace(); } }}設置參數-Xss128K 執行以上代碼,結果如下:調用深度count=1088java.lang.StackOverflowError
  • Alibaba架構師從零開始,一步一步帶你進入並發編程的世界
    簡介並發與線程安全的基本概念構建與組合線程安全類的技術如何利用java.util.concurrent中的並發構建塊如果你是一名並發編程初學者,建議按照順序閱讀本書,並按照書中的例子進行編碼和實戰。如果你有一定的並發編程經驗,可以把本書當做一個手冊, 直接看需要學習的章節。以下是各章節的基本介紹。第l章介紹Java並發編程的挑戰,向讀者說明進入並發編程的世界可能會遇到哪些問題,以及如何解決。