每日英文
Remember every who treat you well coz they are not responsible to do so.
要記住每一個對你好的人, 因為他們本可以不這麼做的。
每日掏心話
有時候你把什麼放下了,不是因為突然就捨得了,是因為期限到了,任性夠了,成熟多了,也就知道這一頁該翻過去了。
來自:劍走偏鋒雨 | 責編:樂樂
正文
0
序言
在Java中,使用線程來異步執行任務。Java線程的創建與銷毀需要一定的開銷,如果我們為每一個任務創建一個新線程來執行,這些線程的創建和銷毀將消耗大量的計算資源。針對這種情況,我們需要使用線程池來管理線程,帶來的好處有3個:
① 降低資源消耗。通過重複利用已創建的線程降低線程創建和銷毀造成的消耗。
② 提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
③ 提高線程的可管理性。線程是稀缺資源,不能無限制創建,否則不但會消耗資源,還會降低系統的穩定性,而使用線程池可以進行統一分配、調優和監控。而這些離不開對線程池原理的深入了解。
本篇文章會從線程池的分類、線程池的創建、向線程池提交任務、關閉線程池、配置線程池、線程池的監控、線程池的實現原理七個方面講解線程池。
1
線程池的分類
想知道線程池的分類,可以看看線程池工廠類Executors的靜態方法,部分代碼如下;
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
return new ScheduledThreadPoolExecutor(var0);
}
從以上代碼可知:線程池分為5種,分別是FixedThreadPool、SingleThreadExecutor、CachedThreadPool、SingleThreadScheduledExecutor、ScheduledThreadPool。其中前3個線程池屬於ThreadPoolExecutor類型,後2個線程池屬於ScheduledThreadPoolExecutor類型。
2
線程池的創建
從線程池的分類,我們得知線程池工廠類Executors創建了兩種類型的線程池,分別是ThreadPoolExecutor類型和ScheduledThreadPoolExecutor類型。我們看下ScheduledThreadPoolExecutor的構造方法:
public ScheduledThreadPoolExecutor(int var1) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
}
public ScheduledThreadPoolExecutor(int var1, RejectedExecutionHandler var2) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
}
public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2, RejectedExecutionHandler var3) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2, var3);
}
從以上代碼得知,ScheduledThreadPoolExecutor構造方法調用的是父類的構造方法,那它的父類是誰呢?
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
原來ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor,原來ScheduledThreadPoolExecutor的創建實際上是通過父類ThreadPoolExecutor來創建的,只是調用的構造方法中的參數不同,最明顯的就是阻塞隊列用的是DelayedWorkQueue。
我們可以看到ThreadPoolExecutor是一個核心類,線程池的創建都離不開它,所以這裡我們通過ThreadPoolExecutor創建一個線程池。這裡只需要new一個ThreadPoolExecutor即可,不過在new之前,我們要先看下它的構造方法:
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue var6) {
this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue var6, ThreadFactory var7) {
this(var1, var2, var3, var5, var6, var7, defaultHandler);
}
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue var6, RejectedExecutionHandler var7) {
this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), var7);
}
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue var6, ThreadFactory var7, RejectedExecutionHandler var8) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if (var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
if (var6 != null && var7 != null && var8 != null) {
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = var1;
this.maximumPoolSize = var2;
this.workQueue = var6;
this.keepAliveTime = var5.toNanos(var3);
this.threadFactory = var7;
this.handler = var8;
} else {
throw new NullPointerException();
}
} else {
throw new IllegalArgumentException();
}
}
會發現前三個構造方法調用的都是最後一個構造方法,那每個構造方法參數都代表什麼呢?從代碼var1、var2、var3我們看不出所指代的內容,所以我們看下文檔:
https://developer.android.google.cn/reference/java/util/concurrent/ThreadPoolExecutor
構造方法
參數列表
① corePoolSize
顧名思義,其指代核心線程的數量。當提交一個任務到線程池時,線程池會創建一個核心線程來執行任務,即使其他空閒的核心線程能夠執行新任務也會創建新的核心線程,而等到需要執行的任務數大於線程池核心線程的數量時就不再創建,這裡也可以理解為當核心線程的數量等於線程池允許的核心線程最大數量的時候,如果有新任務來,就不會創建新的核心線程。
如果你想要提前創建並啟動所有的核心線程,可以調用線程池的prestartAllCoreThreads()方法。
② maximumPoolSize
顧名思義,其指代線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。所以只有隊列滿了的時候,這個參數才有意義。因此當你使用了無界任務隊列的時候,這個參數就沒有效果了。
③ keepAliveTime
顧名思義,其指代線程活動保持時間,即當線程池的工作線程空閒後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高線程的利用率,不然線程剛執行完一個任務,還沒來得及處理下一個任務,線程就被終止,而需要線程的時候又再次創建,剛創建完不久執行任務後,沒多少時間又終止,會導致資源浪費。
注意:這裡指的是核心線程池以外的線程。還可以設置allowCoreThreadTimeout = true這樣就會讓核心線程池中的線程有了存活的時間。
④ TimeUnit
顧名思義,其指代線程活動保持時間的單位:可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。
⑤ workQueue
顧名思義,其指代任務隊列:用來保存等待執行任務的阻塞隊列。
⑥ threadFactory
顧名思義,其指代創建線程的工廠:可以通過線程工廠給每個創建出來的線程設置更加有意義的名字。
⑦ RejectedExecutionHandler
顧名思義,其指代拒絕執行程序,可以理解為飽和策略:當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。在JDK1.5中Java線程池框架提供了以下4種策略。
AbortPolicy:直接拋出異常RejectedExecutionException。
CallerRunsPolicy:只用調用者所在線程來運行任務,即由調用 execute方法的線程執行該任務。
DiscardOldestPolicy:丟棄隊列裡最近的一個任務,並執行當前任務。
DiscardPolicy:不處理,丟棄掉,即丟棄且不拋出異常。
到此,我們學會了ThreadPoolExecutor的構造方法的參數列表每個參數的含義,也就知道了如何去創建一個線程池。
3
向線程池提交任務
可以使用兩個方法向線程池提交任務,分別是execute()和submit()方法。
execute
public class Test implements Runnable {
@Override
public void run() {
System.out.println("現在的Thread id :" + Thread.currentThread().getName());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(3,5,6000,TimeUnit.MILLISECONDS,queue);
for (int i = 0; i < 7 ; i ++){
Runnable runnable = new Test();
executor.execute(runnable);
}
executor.shutdown();
}
}
現在的Thread id :pool-1-thread-1
現在的Thread id :pool-1-thread-2
現在的Thread id :pool-1-thread-3
現在的Thread id :pool-1-thread-1
現在的Thread id :pool-1-thread-2
現在的Thread id :pool-1-thread-3
現在的Thread id :pool-1-thread-1
你會發現雖然我們創建了7個任務,但是只有三個線程在執行。因為我們的任務隊列的個數是10個,當任務隊列沒有滿的時候,任務會放在任務隊列中。顯然3個由核心線程處理,剩下的7個會放在任務隊列。這裡任務隊列還沒有滿,任務會放在任務隊列中。
只有任務隊列滿了,而且線程池未滿的時候,才會創建新的額外的線程去處理任務。這部分的知識會在線程池原理小節講解。
submit
public class Test implements Callable {
public static void main(String[] args) {
Test test = new Test();
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future submit = executorService.submit(test);
try {
Object o = submit.get();
System.out.println(o);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
@Override
public Object call() throws Exception {
for (int i = 0; i <10 ; i++) {
Thread.sleep(1000);
}
return true;
}
}
true
submit需要和Callable一起使用:
關於Callable創建線程的方式不了解的可以閱讀我的這篇文章:https://www.jianshu.com/p/1adedd2b2727
execute方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功;submit方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get方法來獲取返回值,get方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後立即返回,這時候有可能任務沒有執行完。
public class Test implements Callable {
public static void main(String[] args) {
Test test = new Test();
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future submit = executorService.submit(test);
try {
Object o = submit.get(1,TimeUnit.SECONDS);
System.out.println(o);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
@Override
public Object call() throws Exception {
for (int i = 0; i <10 ; i++) {
Thread.sleep(1000);
}
return true;
}
}
java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at Test.main(Test.java:11)
Process finished with exit code 0
當設置1秒後返回結果,但是線程池的任務還沒有執行完,會報超時異常。捕獲異常,在裡面處理邏輯即可。具體這部分的講解會在後續的文章中詳細說明,這裡只需要知道線程池執行任務的兩種方法即可。
4
關閉線程池
有兩個方法可以執行關閉線程池的操作,分別是shutdown和shutdownNow方法。
原理:遍歷線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。
區別:showdownNow首先將線程池的狀態設置成STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而showdown只是將線程池的狀態設置成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的線程。
只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都關閉後,才表示線程池關閉成功,這時調用isTerminated方法會返回true。至於應該調用哪一個方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。
5
配置線程池
要想合理的配置線程池,首先要分析任務的特性,可以從以下幾個角度來分析:
① 任務的性質:
性質不同的任務可以用不同規模的線程池分開處理:
CPU密集型任務,即計算型任務,如搜索、排序,佔用CPU資源較多,應配置儘可能少的線程,因為線程越多,花在任務切換上的時間就越多,效率越低。線程數建議配置N +1 ,N指的是CPU的核數。
IO密集型任務,即網絡請求,讀寫內存的任務,如WEB應用,佔用CPU資源較少(因為大部分的時間,CPU都在等待IO操作的完成),應配置儘可能多的線程,因為線程越多,IO的速度越快,效率越高。線程數建議配置2×N,N指的是CPU的核數。
② 任務的優先級
優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先執行。
③ 任務的執行時間
執行時間不同的任務可以交給不同規模的線程池來處理,或者使用優先級隊列,讓執行時間短的任務先執行。
④ 任務的依賴性
是否依賴其他系統資源,如資料庫連接。依賴資料庫連接的任務,因為線程提交SQL後需要等待資料庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼線程數應該設置得越大,這樣才能更好地利用CPU。
在配置線程池的時候,建議使用有界隊列。
有界隊列能增加系統的穩定性和預警能力,可以根據需求設大一點,比如幾千。比如如果資料庫出現了問題,線程訪問資料庫緩慢就會導致線程阻塞,從而導致任務隊列和線程池滿,這個時候如果設置有界隊列,就可以通過拋出的異常發現問題,如果設置無界隊列,線程池的隊列中的任務會越積越多,有可能會撐滿內存,導致整個系統崩潰。
6
線程池的監控
可以自定義線程池並通過提供的參數進行線程池的監控:
① taskCount:線程池需要執行的任務數量
executor.getTaskCount();
② completedTaskCount:線程池已完成的任務數量,小於等於taskCount
executor.getCompletedTaskCount();
③ largestPoolSize:線程池曾經創建過的最大線程數量。
executor.getLargestPoolSize();
④ getPoolSize:線程池的線程數量。
executor.getPoolSize();
⑤ getActiveCount:獲取活動的線程數。
executor.getActiveCount();
通過擴展線程池進行監控
可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法進行監控。也可以在任務執行前、執行後和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時等。這個知識點有點細了,可以自行詳細學習。
7
線程池的實現原理
當線程池提交一個任務以後,線程池是如何處理這個任務的呢?處理流程圖如下:
線程池主要處理流程.jpeg
線程池判斷核心線程池是否已經滿了,如果沒有,則創建線程執行任務,如果滿了,進入下個流程。
線程池判斷工作隊列是否滿了,如果沒有,則將任務存儲在隊列中,如果滿了,進入下個流程。
線程池判斷線程池是否滿了,如果沒有,則創建線程執行任務,如果滿了,進入下個流程。
線程池判斷線程池滿了,按照策略處理無法執行的任務。
舉個例子:
假設某半成品加工工廠的車間有15個辦公座位,工廠的倉庫最多容納30件半成品。工廠開業時只有1名員工,來了任務就處理,但第二個任務來了後,原有的1名員工仍在工作,處理不了,所以就再招聘了一名員工,就這樣陸續招聘了10個在編員工。
再來了一個任務後,就把任務放倉庫,這10個員工中哪個空閒就會從倉庫取半成品加工,突然有一天任務來的太快,倉庫堆滿了30件半成品,而這10名員工都在工作,考慮效率就招聘了一名臨時員工,臨時員工在工作,倉庫又堆滿了30件半成品,又招聘了一名臨時員工,陸續招聘了5個臨時員工。
有一天倉庫堆滿了30件半成品,15個員工都在工作,倉庫已滿,車間辦公座位已滿,再有任務來就拒絕接收。
那為什麼要這樣設計呢?是想儘可能地避免獲取全局鎖(嚴重的可伸縮瓶頸:每次創建線程都需要獲取全局鎖)——在當前運行的線程數大於等於corePoolSize以後,幾乎所有的execute方法調會將任務放入阻塞隊列,然後由線程處理隊列中的任務,而任務放入阻塞隊列並不需要獲取全局鎖。
後續
歡迎在留言區留下你的觀點,一起討論提高。如果今天的文章讓你有新的啟發,歡迎轉發分享給更多人。