在"Java多線程系列--「基礎篇」01之 基本概念"中,我們介紹過,線程有5種狀態:新建狀態,就緒狀態,運行狀態,阻塞狀態,死亡狀態。線程池也有5種狀態;然而,線程池不同於線程,線程池的5種狀態是:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。
線程池狀態定義代碼如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int ctlOf(int rs, int wc) { return rs | wc; }說明:
狀態說明RUNNING對應的高3位值是111SHUTDOWN對應的高3位值是000STOP對應的高3位值是001TIDYING對應的高3位值是010TERMINATED對應的高3位值是011
ctl是一個AtomicInteger類型的原子對象。ctl記錄了"線程池中的任務數量"和"線程池狀態"2個信息。
ctl共包括32位。其中,高3位表示"線程池狀態",低29位表示"線程池中的任務數量"。線程池各個狀態之間的切換如下圖所示:
1. RUNNING(01) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(02) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處於RUNNING狀態!道理很簡單,在ctl的初始化代碼中(如下),就將它初始化為RUNNING狀態,並且"任務數量"初始化為0。private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));2. SHUTDOWN
(01) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
3. STOP
(02) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。(01) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,並且會中斷正在處理的任務。
4. TIDYING
(02) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。(01) 狀態說明:當所有的任務已終止,ctl記錄的"任務數量"為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
(02) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空並且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。
5. TERMINATED(01) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。
6. 拒絕策略介紹
(02) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之後,就會由 TIDYING -> TERMINATED。線程池的拒絕策略,是指當任務添加到線程池中被拒絕,而採取的處理措施。
當任務添加到線程池中之所以被拒絕,可能是由於:
第一,線程池異常關閉。
第二,任務數量超過線程池的最大限制。線程池共包括4種拒絕策略,它們分別是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy和DiscardPolicy。
策略說明AbortPolicy當任務添加到線程池中被拒絕時,它將拋出 RejectedExecutionException 異常CallerRunsPolicy當任務添加到線程池中被拒絕時,會在線程池當前正在運行的Thread線程池中處理被拒絕的任務DiscardOldestPolicy當任務添加到線程池中被拒絕時,線程池會放棄等待隊列中最舊的未處理任務,然後將被拒絕的任務添加到等待隊列中DiscardPolicy當任務添加到線程池中被拒絕時,線程池將丟棄被拒絕的任務線程池默認的處理策略是AbortPolicy!
7. 拒絕策略對比和示例下面通過示例,分別演示線程池的4種拒絕策略。
7.1 DiscardPolicy 示例
(01) DiscardPolicy 示例
(02) DiscardOldestPolicy 示例
(03) AbortPolicy 示例
(04) CallerRunsPolicy 示例import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
public class DiscardPolicyDemo {
private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } pool.shutdown(); }}
class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } }}運行結果:
task-0 is running.task-1 is running.結果說明:線程池pool的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),這意味著"線程池能同時運行的任務數量最大只能是1"。
7.2 DiscardOldestPolicy 示例
線程池pool的阻塞隊列是ArrayBlockingQueue,ArrayBlockingQueue是一個有界的阻塞隊列,ArrayBlockingQueue的容量為1。這也意味著線程池的阻塞隊列只能有一個線程池阻塞等待。
根據""中分析的execute()代碼可知:線程池中共運行了2個任務。第1個任務直接放到Worker中,通過線程去執行;第2個任務放到阻塞隊列中等待。其他的任務都被丟棄了!import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
public class DiscardOldestPolicyDemo {
private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } pool.shutdown(); }}
class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } }}運行結果:
task-0 is running.task-9 is running.結果說明:將"線程池的拒絕策略"由DiscardPolicy修改為DiscardOldestPolicy之後,當有任務添加到線程池被拒絕時,線程池會丟棄阻塞隊列中末尾的任務,然後將被拒絕的任務添加到末尾。
7.3 AbortPolicy 示例import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;import java.util.concurrent.RejectedExecutionException;
public class AbortPolicyDemo {
private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } } catch (RejectedExecutionException e) { e.printStackTrace(); pool.shutdown(); } }}
class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } }}(某一次)運行結果:
java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656) at AbortPolicyDemo.main(AbortPolicyDemo.java:27)task-0 is running.task-1 is running.結果說明:將"線程池的拒絕策略"由DiscardPolicy修改為AbortPolicy之後,當有任務添加到線程池被拒絕時,會拋出RejectedExecutionException。
7.4 CallerRunsPolicy 示例import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
public class CallerRunsPolicyDemo {
private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); }
pool.shutdown(); }}
class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } }}運行結果:
task-2 is running.task-3 is running.task-4 is running.task-5 is running.task-6 is running.task-7 is running.task-8 is running.task-9 is running.task-0 is running.task-1 is running.結果說明:將"線程池的拒絕策略"由DiscardPolicy修改為CallerRunsPolicy之後,當有任務添加到線程池被拒絕時,線程池會將被拒絕的任務添加到"線程池正在運行的線程"中取運行。
8 Callable 和 Future 簡介Callable 和 Future 是比較有趣的一對組合。當我們需要獲取線程的執行結果時,就需要用到它們。Callable用於產生結果,Future用於獲取結果。
8.1 CallableCallable 是一個接口,它只包含一個call()方法。Callable是一個返回結果並且可能拋出異常的任務。
為了便於理解,我們可以將Callable比作一個Runnable接口,而Callable的call()方法則類似於Runnable的run()方法。
Callable的源碼如下:
public interface Callable<V> { V call() throws Exception;}說明:從中我們可以看出Callable支持泛型。
8.2 FutureFuture 是一個接口。它用於表示異步計算的結果。提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。
Future的源碼如下:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning)
boolean isCancelled()
boolean isDone()
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}說明:Future用於表示異步計算的結果。它的實現類是FutureTask,在講解FutureTask之前,我們先看看Callable, Future, FutureTask它們之間的關係圖,如下:
說明:
(01) RunnableFuture是一個接口,它繼承了Runnable和Future這兩個接口。RunnableFuture的源碼如下:public interface RunnableFuture<V> extends Runnable, Future<V> { void run();}(02) FutureTask實現了RunnableFuture接口。所以,我們也說它實現了Future接口。
9 示例和源碼分析我們先通過一個示例看看Callable和Future的基本用法,然後再分析示例的實現原理。
import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.Executors;import java.util.concurrent.ExecutorService;import java.util.concurrent.ExecutionException;
class MyCallable implements Callable {
@Override public Integer call() throws Exception { int sum = 0; for (int i=0; i<100; i++) sum += i; return Integer.valueOf(sum); } }
public class CallableTest1 {
public static void main(String[] args) throws ExecutionException, InterruptedException{ ExecutorService pool = Executors.newSingleThreadExecutor(); Callable c1 = new MyCallable(); Future f1 = pool.submit(c1); System.out.println(f1.get()); pool.shutdown(); }}運行結果:
結果說明:在主線程main中,通過newSingleThreadExecutor()新建一個線程池。接著創建Callable對象c1,然後再通過pool.submit(c1)將c1提交到線程池中進行處理,並且將返回的結果保存到Future對象f1中。然後,我們通過f1.get()獲取Callable中保存的結果;最後通過pool.shutdown()關閉線程池。
9.1 submit()submit()在java/util/concurrent/AbstractExecutorService.java中實現,它的源碼如下:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}說明:submit()通過newTaskFor(task)創建了RunnableFuture對象ftask。它的源碼如下:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}9.2. FutureTask的構造函數FutureTask的構造函數如下:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; }9.3 FutureTask的run()方法我們繼續回到submit()的源碼中。
在newTaskFor()新建一個ftask對象之後,會通過execute(ftask)執行該任務。此時ftask被當作一個Runnable對象進行執行,最終會調用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中實現,源碼如下:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}說明:run()中會執行Callable對象的call()方法,並且最終將結果保存到result中,並通過set(result)將result保存。
之後調用FutureTask的get()方法,返回的就是通過set(result)保存的值。