Java【多線程系列】JUC線程池—2. 原理(二)、Callable和Future

2021-02-19 人人有架設

在"Java多線程系列--「基礎篇」01之 基本概念"中,我們介紹過,線程有5種狀態:新建狀態,就緒狀態,運行狀態,阻塞狀態,死亡狀態。線程池也有5種狀態;然而,線程池不同於線程,線程池的5種狀態是:Running, SHUTDOWN, STOP, TIDYING, TERMINATED

線程池狀態定義代碼如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int ctlOf(int rs, int wc) { return rs | wc; }

說明:
ctl是一個AtomicInteger類型的原子對象。ctl記錄了"線程池中的任務數量"和"線程池狀態"2個信息。
ctl共包括32位。其中,高3位表示"線程池狀態",低29位表示"線程池中的任務數量"。

狀態說明RUNNING對應的高3位值是111SHUTDOWN對應的高3位值是000STOP對應的高3位值是001TIDYING對應的高3位值是010TERMINATED對應的高3位值是011

線程池各個狀態之間的切換如下圖所示:

1. RUNNING

(01) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(02) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處於RUNNING狀態!道理很簡單,在ctl的初始化代碼中(如下),就將它初始化為RUNNING狀態,並且"任務數量"初始化為0。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

2. SHUTDOWN

(01) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
(02) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。

3. STOP

(01) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,並且會中斷正在處理的任務。
(02) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。

4. TIDYING

(01) 狀態說明:當所有的任務已終止,ctl記錄的"任務數量"為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
(02) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空並且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。

當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。

5. TERMINATED

(01) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。
(02) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之後,就會由 TIDYING -> TERMINATED。

6. 拒絕策略介紹

線程池的拒絕策略,是指當任務添加到線程池中被拒絕,而採取的處理措施。

當任務添加到線程池中之所以被拒絕,可能是由於:
第一,線程池異常關閉。
第二,任務數量超過線程池的最大限制。

線程池共包括4種拒絕策略,它們分別是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy和DiscardPolicy

策略說明AbortPolicy當任務添加到線程池中被拒絕時,它將拋出 RejectedExecutionException 異常CallerRunsPolicy當任務添加到線程池中被拒絕時,會在線程池當前正在運行的Thread線程池中處理被拒絕的任務DiscardOldestPolicy當任務添加到線程池中被拒絕時,線程池會放棄等待隊列中最舊的未處理任務,然後將被拒絕的任務添加到等待隊列中DiscardPolicy當任務添加到線程池中被拒絕時,線程池將丟棄被拒絕的任務

線程池默認的處理策略是AbortPolicy

7. 拒絕策略對比和示例

下面通過示例,分別演示線程池的4種拒絕策略。
(01) DiscardPolicy 示例
(02) DiscardOldestPolicy 示例
(03) AbortPolicy 示例
(04) CallerRunsPolicy 示例

7.1 DiscardPolicy 示例

import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
public class DiscardPolicyDemo {
private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } pool.shutdown(); }}
class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } }}

運行結果:

task-0 is running.task-1 is running.

結果說明:線程池pool的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),這意味著"線程池能同時運行的任務數量最大只能是1"。
線程池pool的阻塞隊列是ArrayBlockingQueue,ArrayBlockingQueue是一個有界的阻塞隊列,ArrayBlockingQueue的容量為1。這也意味著線程池的阻塞隊列只能有一個線程池阻塞等待。
根據""中分析的execute()代碼可知:線程池中共運行了2個任務。第1個任務直接放到Worker中,通過線程去執行;第2個任務放到阻塞隊列中等待。其他的任務都被丟棄了!

7.2 DiscardOldestPolicy 示例

import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
public class DiscardOldestPolicyDemo {
private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } pool.shutdown(); }}
class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } }}

運行結果:

task-0 is running.task-9 is running.

結果說明:將"線程池的拒絕策略"由DiscardPolicy修改為DiscardOldestPolicy之後,當有任務添加到線程池被拒絕時,線程池會丟棄阻塞隊列中末尾的任務,然後將被拒絕的任務添加到末尾。

7.3 AbortPolicy 示例

import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;import java.util.concurrent.RejectedExecutionException;
public class AbortPolicyDemo {
private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } } catch (RejectedExecutionException e) { e.printStackTrace(); pool.shutdown(); } }}
class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } }}

(某一次)運行結果:

java.util.concurrent.RejectedExecutionException    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)    at AbortPolicyDemo.main(AbortPolicyDemo.java:27)task-0 is running.task-1 is running.

結果說明:將"線程池的拒絕策略"由DiscardPolicy修改為AbortPolicy之後,當有任務添加到線程池被拒絕時,會拋出RejectedExecutionException。

7.4 CallerRunsPolicy 示例

import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
public class CallerRunsPolicyDemo {
private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); }
pool.shutdown(); }}
class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } }}

運行結果:

task-2 is running.task-3 is running.task-4 is running.task-5 is running.task-6 is running.task-7 is running.task-8 is running.task-9 is running.task-0 is running.task-1 is running.

結果說明:將"線程池的拒絕策略"由DiscardPolicy修改為CallerRunsPolicy之後,當有任務添加到線程池被拒絕時,線程池會將被拒絕的任務添加到"線程池正在運行的線程"中取運行。

8 Callable 和 Future 簡介

Callable 和 Future 是比較有趣的一對組合。當我們需要獲取線程的執行結果時,就需要用到它們。Callable用於產生結果,Future用於獲取結果。

8.1 Callable

Callable 是一個接口,它只包含一個call()方法。Callable是一個返回結果並且可能拋出異常的任務。

為了便於理解,我們可以將Callable比作一個Runnable接口,而Callable的call()方法則類似於Runnable的run()方法。

Callable的源碼如下:

public interface Callable<V> {    V call() throws Exception;}

說明:從中我們可以看出Callable支持泛型。

8.2 Future

Future 是一個接口。它用於表示異步計算的結果。提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。

Future的源碼如下:

public interface Future<V> {        boolean     cancel(boolean mayInterruptIfRunning)
boolean isCancelled()
boolean isDone()
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}

說明:Future用於表示異步計算的結果。它的實現類是FutureTask,在講解FutureTask之前,我們先看看Callable, Future, FutureTask它們之間的關係圖,如下:

說明:
(01) RunnableFuture是一個接口,它繼承了Runnable和Future這兩個接口。RunnableFuture的源碼如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {    void run();}

(02) FutureTask實現了RunnableFuture接口。所以,我們也說它實現了Future接口。

9 示例和源碼分析

我們先通過一個示例看看Callable和Future的基本用法,然後再分析示例的實現原理。

import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.Executors;import java.util.concurrent.ExecutorService;import java.util.concurrent.ExecutionException;
class MyCallable implements Callable {
@Override public Integer call() throws Exception { int sum = 0; for (int i=0; i<100; i++) sum += i; return Integer.valueOf(sum); } }
public class CallableTest1 {
public static void main(String[] args) throws ExecutionException, InterruptedException{ ExecutorService pool = Executors.newSingleThreadExecutor(); Callable c1 = new MyCallable(); Future f1 = pool.submit(c1); System.out.println(f1.get()); pool.shutdown(); }}

運行結果:

結果說明:在主線程main中,通過newSingleThreadExecutor()新建一個線程池。接著創建Callable對象c1,然後再通過pool.submit(c1)將c1提交到線程池中進行處理,並且將返回的結果保存到Future對象f1中。然後,我們通過f1.get()獲取Callable中保存的結果;最後通過pool.shutdown()關閉線程池。

9.1 submit()

submit()在java/util/concurrent/AbstractExecutorService.java中實現,它的源碼如下:

public <T> Future<T> submit(Callable<T> task) {    if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task);        execute(ftask);        return ftask;}

說明:submit()通過newTaskFor(task)創建了RunnableFuture對象ftask。它的源碼如下:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {    return new FutureTask<T>(callable);}

9.2. FutureTask的構造函數

FutureTask的構造函數如下:

public FutureTask(Callable<V> callable) {    if (callable == null)        throw new NullPointerException();        this.callable = callable;        this.state = NEW;       }

9.3 FutureTask的run()方法

我們繼續回到submit()的源碼中。

在newTaskFor()新建一個ftask對象之後,會通過execute(ftask)執行該任務。此時ftask被當作一個Runnable對象進行執行,最終會調用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中實現,源碼如下:

public void run() {    if (state != NEW ||        !UNSAFE.compareAndSwapObject(this, runnerOffset,                                     null, Thread.currentThread()))        return;    try {                Callable<V> c = callable;        if (c != null && state == NEW) {            V result;            boolean ran;            try {                                result = c.call();                ran = true;            } catch (Throwable ex) {                result = null;                ran = false;                setException(ex);            }                        if (ran)                set(result);        }    } finally {        runner = null;                int s = state;        if (s >= INTERRUPTING)            handlePossibleCancellationInterrupt(s);    }}

說明:run()中會執行Callable對象的call()方法,並且最終將結果保存到result中,並通過set(result)將result保存。
之後調用FutureTask的get()方法,返回的就是通過set(result)保存的值。

相關焦點

  • Java是如何實現Future模式的?萬字詳解!
    在這一過程中,這一系列的單號都是我們收貨的重要憑證。因此,JDK的Future就類似於我們網購買東西的單號,當我們執行某一耗時的任務時,我們可以另起一個線程異步去執行這個耗時的任務,同時我們可以幹點其他事情。當事情幹完後我們再根據future這個"單號"去提取耗時任務的執行結果即可。因此Future也是多線程中的一種應用模式。
  • 概述:JVM內存模型、線程隔離數據區、線程共享數據區
    在程序運行的這一過程中,jvm會將其管理的內存空間劃分為不同的區域,這些區域各有各的用途,我們將其分為五類:方法區堆虛擬機棧本地方法棧程序計數器其中方法區和堆是線程共享的,隨jvm啟動和停止而創建和銷毀;而虛擬機棧、本地方法棧和程序計數器則是線程私有的,隨線程的創建和結束而創建和銷毀。
  • 實現多線程的標準操作,基於Runnable接口實現java多線程
    1 為什麼要用Runnable上一篇文章介紹了通過繼承Thread類,實現java多線程。但如果當我們創建的這個線程類還想繼承其他類的時候,這種方法就略顯局限了。這也是java單繼承的局限性。為了避免這種局限性,所以又提供了第二種多線程主體定義的形式:實現Runnable接口。2 創建一個實現Runnable的對象我們先創建一個RunnableDemo類,並在裡面創建一個MyThread2內部類,MyThread2實現Runnable接口。
  • 看了線程和線程池的對比,才知道池化技術到底有多牛
    生活案例 2小美是一家公司的 HR,每年年初是小美最頭疼的日子了。因為年初有大量的員工離職,因此小美需要一邊辦理離職員工的手續,一邊瘋狂的招人,除了這些工作之外,小美還要忍受來自各部門和大 BOSS 的間歇性催促,這些都讓小美痛苦不已。於是為了應對每年年初的這種囧境,小美也變聰明了,她每年年末的時候都會預先招聘一些員工,以備來年的不時之需。
  • 新手編程:Java多線程中Thread與Runnable的區別
    Java多線程中Thread與Runnable的區別定義extends Thread子類繼承Thread具備多線程能力,可以實現多線程;啟動線程的方法:①創建子類對象+i);}}}上述示例執行後可以看到:主線程和子線程的執行順序本該是「先執行子線程、再執行主線程」,事實上卻是主線程和子線程交替執行(無法人為控制,由CPU調度執行),這便是多線程的特徵;繼承Thread
  • 通俗易懂的告訴你「策略模式」在java多線程中的應用
    花10分鐘認真的閱讀一篇文章有時或許比敲60分鐘代碼還有效我們都知道java啟動多線程有兩種方式,一種是繼承Thread類,一種是實現Runnable接口,但是很多小夥伴可能不知道實現Runnable接口這種方式中運用了
  • Java 實現線程的方式有幾種方式?帶有返回值的線程怎麼實現?
    Java 實現線程的方式有幾種方式?帶有返回值的線程怎麼實現?在Java線程開發中,有幾種方法開啟線程?假如需要得到線程返回的信息怎麼辦?可以實現嗎?凱哥將通過源碼和大家一起分享下線程怎麼將返回值帶回來的。
  • 三萬字總結最全Java線程池源碼面試題
    1 為什麼要用線程池1.1 線程the more, the better?1、線程在java中是一個對象,更是作業系統的資源,線程創建、銷毀都需要時間。如果創建時間+銷毀時間>執行任務時間就很不合算。
  • Java 多線程基礎 interrupt()和線程終止方式
    二、線程終止方式Thread中的 stop() 和 suspend() 方法,由於固有的不安全性,已經建議不再使用!下面,我先分別討論線程在「阻塞狀態」和「運行狀態」的終止方式,然後再總結出一個通用的方式。(一)、終止處於「阻塞狀態」的線程.通常,我們通過「中斷」方式終止處於「阻塞狀態」的線程。
  • java入門避坑必讀,通過Thread類創建java多線程
    欲善編程,多看、敲、討論;動眼、手、大腦。1 為什麼要用多線程平常我們做crud的時候,用到多線程的機會不多。但當我們要處理一些複雜的業務時,或者提高程序處理效率時,就繞不開多線程的使用。也有些時候,我們需要對某個接口進行並發測試,也可以通過多線程來做一個性能測試小程序。2 創建一個java線程類我們先創建一個ThreadDemo類,並在裡面創建一個MyThread內部類,MyThread繼承Thread類。繼承之後,MyThread就是一個線程類了,具備了線程類的所有屬性。
  • Java多線程:帶你了解神秘的線程變量 ThreadLocal
    前言在 Java多線程中,線程變量ThreadLocal非常重要,但對於很多開發者來說,這並不容易理解,甚至覺得有點神秘今天,我將獻上一份 ThreadLocal的介紹 & 實戰攻略,1:線程1的threadLocal線程2:線程2的threadLocal
  • Java 內存模型與線程
    二.核心知識點歸納2.1 概述Q1:多任務處理的必要性充分利用計算機處理器的能力,避免處理器在磁碟2.2.1 設計目的屏蔽掉各種硬體和作業系統的內存訪問差異,實現 Java 程序在各種平臺下都能達到一致的內存訪問效果2.2.2 設計方法
  • 淺談Java線程狀態轉換及控制
    此時的線程就是一個在堆中分配了內存的靜態的對象,線程的執行體(run方法的代碼)不會被執行。  當調用了線程對象的start()方法後,該線程就處於就緒狀態。此時該線程並沒有開始運行,而是處於可運行池中,Java虛擬機會為該線程創建方法調用棧和程序計數器。至於該線程何時才能運行,要取決於JVM的調度。
  • Java項目實踐,CountDownLatch實現多線程閉鎖
    摘要本文主要介紹Java多線程並發中閉鎖(Latch)的基本概念、原理、示例代碼、應用場景,通過學習,可以掌握多線程並發時閉鎖(Latch)的使用方法。概念「閉鎖」就是指一個被鎖住了的門將線程a擋在了門外(等待執行),只有當門打開後(其他線程執行完畢),門上的鎖才會被打開,a才能夠繼續執行。
  • 「原創」Java並發編程系列07|synchronized原理
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫並發編程中用到最多的關鍵字毫無疑問是synchronized。這篇文章就來探究下synchronized:synchronized如何使用?
  • JVM內存區域之線程私有區域
    我們可以把windows的CPU+緩存+主內存和JVM的執行引擎+操作數棧+(棧、堆)對應起來,這樣更加利於我們去理解JVM。虛擬機棧:從上圖可見,java虛擬機棧是線程私有的,它的生命周期和線程相同。
  • Java創建線程安全的單例singleton
    Java創建線程安全的單例單例的使用場景JVM中僅需要一個實例,因此能節省內存,加快訪問速度,比如資料庫連接池,計數器等2、多線程下的安全單例,第一種方式就是方法加鎖來實現,比如添加關鍵字synchronized。
  • Java從零開始學 - 第25天:掌握JUC中的阻塞隊列
    BlockingQueue接口BlockingQueue位於juc中,熟稱阻塞隊列, 阻塞隊列首先它是一個隊列,繼承Queue接口,是隊列就會遵循先進先出(FIFO)的原則,又因為它是阻塞的,故與普通的隊列有兩點區別:當一個線程向隊列裡面添加數據時,如果隊列是滿的,那麼將阻塞該線程,暫停添加數據當一個線程從隊列裡面取出數據時,如果隊列是空的,那麼將阻塞該線程
  • 程式設計師:還不知道怎么正確地停止線程?
    前言線程在後臺開發中會經常用到,那你每次關閉線程都正確嗎?特別是目前多線程開發場景越來越多,給程式設計師造成的困擾就更多了。今天我們就來了了如何正確停止線程。如何正確地停止線程?一個線程一般都是正常的運行直到執行任務完畢而結束。那什麼時候我們需要對線程進行停止的動作呢?
  • 並發編程——多線程計數的更優解:LongAdder原理分析
    2.LongAdder原理的直觀理解為了更好地對源碼進行分析,我們需要先從直覺上理解它的原理,否則直接看代碼的話會一臉懵逼LongAdder的計數主要分為2個對象一個long類型的欄位:base一個Cell