本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫
本篇介紹第二個並發工具類CyclicBarrier,CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier),分以下部分介紹:
CyclicBarrier的使用CyclicBarrier與CountDownLatch比較CyclicBarrier源碼解析1. CyclicBarrier的使用
CyclicBarrier要做的事情是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程一起執行。
1.1 API
CyclicBarrier(int parties)
:構造方法,parties表示攔截線程的數量。
CyclicBarrier(int parties, Runnable barrierAction)
:barrierAction用於在線程到達屏障時優先執行b,用於處理更加複雜的業務場景。
await()
:將當前線程阻塞,等到所有的線程都到達指定的臨界點後一起執行。
getNumberWaiting()
:獲取當前有多少個線程阻塞等待在臨界點上。
reset()
:將屏障重置為初始狀態。
1.2 使用舉例
舉個例子說明CyclicBarrier的使用:8個運動員參加比賽,運動員可能到達賽場的時間不一樣,要等8個運動員到齊了才開始比賽,代碼如下:
public class CyclicBarrierTest {
private static CyclicBarrier barrier = new CyclicBarrier(8, () -> {
System.out.println("所有運動員入場,裁判員一聲令下!!!");
});
public static void main(String[] args) {
System.out.println("運動員準備進場,全場歡呼......");
for (int i = 0; i < 8; i++) {
new Thread() {
public void run() {
System.out.println(Thread.currentThread().getName() + " 運動員到達起點,準備好了!!!");
try {
barrier.await();// 運動員等待,等所有運動員全部到齊後一起開始比賽
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 運動員出發!!!");
};
}.start();
}
}
}
輸出結果:
運動員準備進場,全場歡呼......
Thread-0 運動員到達起點,準備好了!!!
Thread-2 運動員到達起點,準備好了!!!
Thread-4 運動員到達起點,準備好了!!!
Thread-1 運動員到達起點,準備好了!!!
Thread-3 運動員到達起點,準備好了!!!
Thread-5 運動員到達起點,準備好了!!!
Thread-6 運動員到達起點,準備好了!!!
Thread-7 運動員到達起點,準備好了!!!
所有運動員入場,裁判員一聲令下!!!
Thread-7 運動員出發!!!
Thread-0 運動員出發!!!
Thread-1 運動員出發!!!
Thread-4 運動員出發!!!
Thread-2 運動員出發!!!
Thread-6 運動員出發!!!
Thread-5 運動員出發!!!
Thread-3 運動員出發!!!
2. 與CountDownLatch比較
CountDownLatch用於一個線程等待若干個其他線程執行完任務之後才執行,強調一個線程等待,這個線程會阻塞。而CyclicBarrier用於一組線程互相等待至某個狀態,然後這一組線程再同時執行,強調的是多個線程互等,這多個線程阻塞,等大家都完成,再攜手共進。CountDownLatch是不能復用的,而CyclicLatch是可以復用的。使用reset()方法將屏障重置為初始狀態之後就可以復用。CyclicBarrier提供了更多的方法,能夠通過getNumberWaiting()獲取阻塞線程的數量,通過isBroken()方法可以知道阻塞的線程是否被中斷。3. 源碼分析
CyclicBarrier是通過Lock的Condition實現的,每個CyclicBarrier對應個Lock鎖和該鎖的condition條件。
創建CyclicBarrier時設置一個count計數,當調用await()時做兩件事:①將count-1 ②將線程阻塞並構造成結點加入condition條件隊列。
當count變為0時,達到等待線程數量要求,condition將條件隊列中的線程全部喚醒。
3.1 類結構
public class CyclicBarrier {
private static class Generation { // 內部類,當有parties個線程到達barrier就會更新換代
boolean broken = false; // 是否損壞
}
private final ReentrantLock lock = new ReentrantLock(); // 重入鎖
private final Condition trip = lock.newCondition();
private final int parties; // 等待線程總數量
private final Runnable barrierCommand; // 達到等待線程數量後執行的線程
private Generation generation = new Generation(); // 當有parties個線程到達barrier,就會更新換代
private int count; // 記錄當前線程數量
}
3.2 構造方法
將parties設置為count值,設置達到等待線程數量後優先執行的線程
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties; // 保存parties可循環使用
this.count = parties; // 將parties設置為count值
this.barrierCommand = barrierAction;// 設置達到等待線程數量後優先執行的線程
}
3.3 await()
await()方法:
①將count-1
②將線程阻塞並構造成結點加入condition條件隊列。
③當count變為0時,condition將條件隊列中的線程全部喚醒。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier(); // 代失效,喚醒所有線程
throw new InterruptedException();
}
int index = --count; // 計數
if (index == 0) { // 達到要求數量
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 達到等待線程數量後執行barrierCommand
ranAction = true;
nextGeneration(); // 喚醒本代所有線程,生成新一代,重置count
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 線程數量未達到要求數量,將線程掛起等待
for (;;) {
try {
if (!timed)
trip.await(); // 將線程加入condition隊列掛起
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && !g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 特殊情況處理
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
// 當前代失效,喚醒所有線程
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
// 喚醒本代所有線程,生成新一代,重置count
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
4. 總結
CyclicBarrier要做的事情是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。
注意CyclicBarrier與CountDownLatch的區別:CountDownLatch用於一個線程等待若干個其他線程執行完任務之後才執行,而CyclicBarrier強調的是多個線程互等,等大家都完成,再攜手共進。此外,CyclicBarrier功能更加強大,可以循環使用。
CyclicBarrier是通過Lock的Condition實現的,每個CyclicBarrier對應個Lock鎖和該鎖的condition條件。創建CyclicBarrier時設置一個count計數,當調用await()時做兩件事:①將count-1 ②將線程阻塞並構造成結點加入condition條件隊列。當count變為0時,達到等待線程數量要求,condition將條件隊列中的線程全部喚醒。