java並發編程之深入學習Concurrent包(七,CyclicBarrier類)

2021-01-19 暖爸的java家園

引言:

CyclicBarrier,就是循環的柵欄。柵欄,就是用來阻攔線程,通過它可以實現讓一些線程等待,直到到達約定的狀態後線程再同時開始執行。

CyclicBarrier是可以被重用的。

CyclicBarrier如下圖所示:

CyclicBarrier

CyclicBarrier的欄位:

lock: CyclicBarrier基於鎖實現線程的等待。

Generation : 柵欄的年齡,每次使用完都需要新的,其中有broken參數表示柵欄是否完好,是否有牛羊(線程)跑出去了。

trip: lock上的Condition條件,線程在條件滿足前先被放在Condition的等待隊列中。

parties: 線程的數量,在構造函數參數傳入,當前等待線程數=parties時,柵欄打開。

count: 當前已經被攔截的線程數

barrierCommand: 自定義Runnable類型可執行線程,在需要的線程都在等待之後且未被喚醒時執行。

如下代碼所示:

private static class Generation {

boolean broken = false;

}

/** The lock for guarding barrier entry */

private final ReentrantLock lock = new ReentrantLock();

/** Condition to wait on until tripped */

private final Condition trip = lock.newCondition();

/** The number of parties */

private final int parties;

/* The command to run when tripped */

private final Runnable barrierCommand;

/** The current generation */

private Generation generation = new Generation();

/**

* Number of parties still waiting. Counts down from parties to 0

* on each generation. It is reset to parties on each new

* generation or when broken.

*/

private int count;

CyclicBarrier的構造器如下所示:

public CyclicBarrier(int parties, Runnable barrierAction) {

}

public CyclicBarrier(int parties) {

}

參數parties: 指讓多少個線程或者任務等待至柵欄前的狀態;

參數barrierAction: Runnable線程類型參數,當這些線程都達到柵欄前等待狀時會執行該內容。

CyclicBarrier的實現原理:

如下代碼所示,CyclicBarrier實現步驟如下

1. 看generation是否被破壞(broken==true),如何會破壞generation呢?就像農場的籬笆,如果線程像牛羊一樣跑出去了,那麼籬笆就必須標記是壞的。

如下幾種情況,generation會被破壞:

線程被標記了要中斷

barrierCommand線程執行過程出現異常

等待中的線程被中斷

設置的超時時間到

2. 檢查線程是否被設置了中斷標誌,如果設置了,則該generation已被破壞

3. 計數器-1,代表又一個線程即將進入等待

4. 如果count=0,表示所有線程都已經進入等待,執行對應barrierCommand的線程,使用nextGeneration更新柵欄年齡。如果barrierCommand執行異常跳出,則柵欄置為被破壞。

5. 在for循環中,進入trip.await(),進入Condition等待隊列,如果超時或中斷,則generation被破壞。

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

int index = --count;

if (index == 0) { // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

nextGeneration();

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

if (!timed)

trip.await();

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to

// "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

if (g != generation)

return index;

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

private void nextGeneration() {

// signal completion of last generation

trip.signalAll();

// set up next generation

count = parties;

generation = new Generation();

}

/**

* Sets current barrier generation as broken and wakes up everyone.

* Called only while holding lock.

*/

private void breakBarrier() {

generation.broken = true;

count = parties;

trip.signalAll();

}

CyclicBarrier使用場景:

假設有10個馬拉松選手小組要去比賽,只有所有選手都就位後,才開始登記。

如下代碼所示:

class Runner implements Runnable {

private CyclicBarrier cyclicBarrierTest;

private String runnerName;

public Runner(CyclicBarrier cyclicBarrier, String name) {

this.cyclicBarrierTest = cyclicBarrier;

this.runnerName = name;

}

class CheckName {

private CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

public void start() {

List<Runner> runners= new ArrayList<>();

for (int i=0;i<10;i++){

runner.add(new Runner(cyclicBarrier,"runner:"+i));

}

Executor executor = Executors.newFixedThreadPool(8);

for ( Runner runner: runners) {

executor.execute(runner);

}

}

}

@Override

public void run() {

System.out.println(name + "START");

try {

cyclicBarrier.await();

System.out.println(time + ": "+ new Date());

} catch (Exception e) {

}

}

}

相關焦點

  • Java並發編程:CountDownLatch、CyclicBarrier和Semaphore
    在java 1.5中,提供了一些非常有用的輔助類來幫助我們進行並發編程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我們就來學習一下這三個輔助類的用法。一.CountDownLatch用法CountDownLatch類位於java.util.concurrent包下,利用它可以實現類似計數器的功能。比如有一個任務A,它要等待其他4個任務執行完畢之後才能執行,此時就可以利用CountDownLatch來實現這種功能了。
  • Java並發編程系列23|循環屏障CyclicBarrier
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本篇介紹第二個並發工具類CyclicBarrier,CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier),分以下部分介紹:CyclicBarrier的使用CyclicBarrier與CountDownLatch
  • 原創】Java並發編程系列01|開篇獲獎感言
    說白了,對我們程式設計師來說,並發編程在日常工作中是都要用到的,就算目前沒用到將來也要用到,工作中用不到面試也要用到。如何學習並發編程並發編程的學習確實有難度《Java並發編程實戰》中這麼說,「編寫正確的程序難,編寫正確的並發程序更難」。
  • 項目實踐,使用Cyclic Barrier在多線程中設置屏障
    代碼如下:package com.highcon.thread;import java.io.IOException;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import
  • 詳解java中的同步工具類CyclicBarrier
    之前介紹了java中另一個同步工具類CountDownLatch,這篇文章主要介紹CyclicBarrier。一、概念理解CyclicBarrier允許一組線程在到達某個柵欄點(common barrier point)互相等待,直到最後一個線程到達柵欄點,柵欄才會打開,處於阻塞狀態的線程恢復繼續執行。就比如說我們在打王者的時候,十個人必須全部加載到100%,才可以開局。否則只要有一個人沒有加載到100%,那這個遊戲就不能開始。
  • JAVA並發編程:並發問題的根源及主要解決方法
    而在java中,不變性變量即通過final修飾的變量,如String,Long,Double等類型都是Immutability的,它們的內部實現都是基於final關鍵字的。那這又和並發編程有什麼關係呢?其實啊,並發問題很大部分原因就是因為線程切換破壞了原子性,這又導致線程隨意對變量的讀寫破壞了數據的一致性。
  • 2020學習Java必看的3本書籍
    大家好,歡迎來到2020, 如果您正在考慮學習新事物或想要提高對Java基本技術的了解,那麼閱讀書籍絕對可以為您提供幫助。今天,我將分享過去幾年中一些最好的Java書籍,您可以在2018年閱讀這些書籍,以更好地學習Java和相關技術。1.
  • JAVA高並發網絡編程之BIO堵塞網絡編程
    上次說了網絡編程都是有作業系統統一的API的,每個語言有對它的實現,這次來一起說說通過java原生的socket編程完成BIO的網絡編程。
  • 大數據入門:Java和Scala編程對比
    在學習大數據之初,很多人都會對程式語言的學習有疑問,比如說大數據編程主要用什麼語言,在實際運用當中,大數據主流編程是Java,但是涉及到Spark、Kafka框架,還需要懂Scala。今天的大數據入門分享,我們就來對Java和Scala這兩門語言的編程做個對比。
  • 「原創」Java並發編程系列09|基礎乾貨
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第9篇。現在,我們進入正題:介紹並發編程的基礎性概念。
  • 「原創」Java並發編程系列07|synchronized原理
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫並發編程中用到最多的關鍵字毫無疑問是synchronized。這篇文章就來探究下synchronized:synchronized如何使用?
  • 「原創」Java並發編程系列02|並發編程三大核心問題
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫寫在前面編寫並發程序是比較困難的,因為並發程序極易出現Bug,這些Bug有都是比較詭異的,很多都是沒辦法追蹤,而且難以復現。
  • 「原創」Java並發編程系列26|ConcurrentHashMap(上)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫終於輪到ConcurrentHashMap了,並發編程必備,也是面試必備。此外,ConcurrentHashMap和HashMap有相同的數據結構,在理解HashMap的基礎上學習ConcurrentHashMap
  • 「原創」Java並發編程系列14|AQS源碼分析
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 14 篇,文末有本系列文章匯總。AbstractQueuedSynchronizer是Java並發包java.util.concurrent的核心基礎組件,是實現Lock的基礎。
  • 阿里P9都窺視已久的「Java並發實現原理:JDK源碼剖析」
    如果遇到複雜的多線程編程場景,就需要開發者基於這些簡單的機制解決複雜的線程同步問題。而從JDK 1.5開始,並發編程大師Doug Lea奉上了一個系統而全面的並發編程框架——JDK Concurrent包,裡面包含了各種原子操作、線程安全的容器、線程池和異步編程等內容。本書基於JDK 7和JDK 8,對整個Concurrent包進行全面的源碼剖析。
  • 「原創」Java並發編程系列06|你不知道的final
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫final在Java中是一個保留的關鍵字,可以修飾變量、方法和類。那麼fianl在並發編程中有什麼作用呢?本文就在對final常見應用總結基礎上,講解final並發編程中的應用。
  • 給Java新手的一些建議——Java知識點歸納(Java基礎部分)
    在這裡需要掌握的知識有:javac 編譯java文件為 class 文件java 命令的使用, 帶package的java類如何在命令行中啟動java程序涉及到的各個路徑(classpath, java。library。path, java運行的主目錄等)3.
  • 網工的Python之路:Concurrent.Futures
    這篇文章來介紹下另外一個可以實現並發編程的Python標準庫:concurrent.futures。)、協程(Coroutine)、I/O密集型(I/O-bound)、CPU密集型(CPU-bound)等術語,如何區分它們對學習Python的網工來說是一個難點,開篇講concurrent.futures之前先把上述這些術語之間的關係和區別給大家大致捋一下:1.
  • 「原創」Java並發編程系列33|深入理解線程池(上)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫並發編程必不可少的線程池,接下來分兩篇文章介紹線程池,本文是第一篇。介紹1.1 使用場景並發編程可以高效利用CPU資源,提升任務執行效率,但是多線程及線程間的切換也伴隨著資源的消耗。當遇到單個任務處理時間比較短,但需要處理的任務數量很大時,線程會頻繁的創建銷毀,大量的時間和資源都會浪費在線程的創建和銷毀上,效率很低。
  • Java從零開始學 - 第25天:掌握JUC中的阻塞隊列
    ;import java.util.concurrent.此時PriorityBlockingQueue就派上用場了,代碼如下:package com.itsoku.chat25;import java.util.concurrent.PriorityBlockingQueue;import java.util.concurrent.TimeUnit;/** * 跟著阿里p7學並發,微信公眾號