引言:
CyclicBarrier,就是循環的柵欄。柵欄,就是用來阻攔線程,通過它可以實現讓一些線程等待,直到到達約定的狀態後線程再同時開始執行。
CyclicBarrier是可以被重用的。
CyclicBarrier如下圖所示:
CyclicBarrier的欄位:
lock: CyclicBarrier基於鎖實現線程的等待。
Generation : 柵欄的年齡,每次使用完都需要新的,其中有broken參數表示柵欄是否完好,是否有牛羊(線程)跑出去了。
trip: lock上的Condition條件,線程在條件滿足前先被放在Condition的等待隊列中。
parties: 線程的數量,在構造函數參數傳入,當前等待線程數=parties時,柵欄打開。
count: 當前已經被攔截的線程數
barrierCommand: 自定義Runnable類型可執行線程,在需要的線程都在等待之後且未被喚醒時執行。
如下代碼所示:
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
CyclicBarrier的構造器如下所示:
public CyclicBarrier(int parties, Runnable barrierAction) {
}
public CyclicBarrier(int parties) {
}
參數parties: 指讓多少個線程或者任務等待至柵欄前的狀態;
參數barrierAction: Runnable線程類型參數,當這些線程都達到柵欄前等待狀時會執行該內容。
CyclicBarrier的實現原理:
如下代碼所示,CyclicBarrier實現步驟如下
1. 看generation是否被破壞(broken==true),如何會破壞generation呢?就像農場的籬笆,如果線程像牛羊一樣跑出去了,那麼籬笆就必須標記是壞的。
如下幾種情況,generation會被破壞:
線程被標記了要中斷
barrierCommand線程執行過程出現異常
等待中的線程被中斷
設置的超時時間到
2. 檢查線程是否被設置了中斷標誌,如果設置了,則該generation已被破壞
3. 計數器-1,代表又一個線程即將進入等待
4. 如果count=0,表示所有線程都已經進入等待,執行對應barrierCommand的線程,使用nextGeneration更新柵欄年齡。如果barrierCommand執行異常跳出,則柵欄置為被破壞。
5. 在for循環中,進入trip.await(),進入Condition等待隊列,如果超時或中斷,則generation被破壞。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
CyclicBarrier使用場景:
假設有10個馬拉松選手小組要去比賽,只有所有選手都就位後,才開始登記。
如下代碼所示:
class Runner implements Runnable {
private CyclicBarrier cyclicBarrierTest;
private String runnerName;
public Runner(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrierTest = cyclicBarrier;
this.runnerName = name;
}
class CheckName {
private CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
public void start() {
List<Runner> runners= new ArrayList<>();
for (int i=0;i<10;i++){
runner.add(new Runner(cyclicBarrier,"runner:"+i));
}
Executor executor = Executors.newFixedThreadPool(8);
for ( Runner runner: runners) {
executor.execute(runner);
}
}
}
@Override
public void run() {
System.out.println(name + "START");
try {
cyclicBarrier.await();
System.out.println(time + ": "+ new Date());
} catch (Exception e) {
}
}
}