Java並發編程系列23|循環屏障CyclicBarrier

2021-01-19 酷扯兒

本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫

本篇介紹第二個並發工具類CyclicBarrier,CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier),分以下部分介紹:

CyclicBarrier的使用CyclicBarrier與CountDownLatch比較CyclicBarrier源碼解析1. CyclicBarrier的使用

CyclicBarrier要做的事情是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程一起執行。

1.1 API

CyclicBarrier(int parties)

:構造方法,parties表示攔截線程的數量。

CyclicBarrier(int parties, Runnable barrierAction)

:barrierAction用於在線程到達屏障時優先執行b,用於處理更加複雜的業務場景。

await()

:將當前線程阻塞,等到所有的線程都到達指定的臨界點後一起執行。

getNumberWaiting()

:獲取當前有多少個線程阻塞等待在臨界點上。

reset()

:將屏障重置為初始狀態。

1.2 使用舉例

舉個例子說明CyclicBarrier的使用:8個運動員參加比賽,運動員可能到達賽場的時間不一樣,要等8個運動員到齊了才開始比賽,代碼如下:

public class CyclicBarrierTest {

private static CyclicBarrier barrier = new CyclicBarrier(8, () -> {

System.out.println("所有運動員入場,裁判員一聲令下!!!");

});

public static void main(String[] args) {

System.out.println("運動員準備進場,全場歡呼......");

for (int i = 0; i < 8; i++) {

new Thread() {

public void run() {

System.out.println(Thread.currentThread().getName() + " 運動員到達起點,準備好了!!!");

try {

barrier.await();// 運動員等待,等所有運動員全部到齊後一起開始比賽

} catch (Exception e) {

e.printStackTrace();

}

System.out.println(Thread.currentThread().getName() + " 運動員出發!!!");

};

}.start();

}

}

}

輸出結果:

運動員準備進場,全場歡呼......

Thread-0 運動員到達起點,準備好了!!!

Thread-2 運動員到達起點,準備好了!!!

Thread-4 運動員到達起點,準備好了!!!

Thread-1 運動員到達起點,準備好了!!!

Thread-3 運動員到達起點,準備好了!!!

Thread-5 運動員到達起點,準備好了!!!

Thread-6 運動員到達起點,準備好了!!!

Thread-7 運動員到達起點,準備好了!!!

所有運動員入場,裁判員一聲令下!!!

Thread-7 運動員出發!!!

Thread-0 運動員出發!!!

Thread-1 運動員出發!!!

Thread-4 運動員出發!!!

Thread-2 運動員出發!!!

Thread-6 運動員出發!!!

Thread-5 運動員出發!!!

Thread-3 運動員出發!!!

2. 與CountDownLatch比較

CountDownLatch用於一個線程等待若干個其他線程執行完任務之後才執行,強調一個線程等待,這個線程會阻塞。而CyclicBarrier用於一組線程互相等待至某個狀態,然後這一組線程再同時執行,強調的是多個線程互等,這多個線程阻塞,等大家都完成,再攜手共進。CountDownLatch是不能復用的,而CyclicLatch是可以復用的。使用reset()方法將屏障重置為初始狀態之後就可以復用。CyclicBarrier提供了更多的方法,能夠通過getNumberWaiting()獲取阻塞線程的數量,通過isBroken()方法可以知道阻塞的線程是否被中斷。3. 源碼分析

CyclicBarrier是通過Lock的Condition實現的,每個CyclicBarrier對應個Lock鎖和該鎖的condition條件。

創建CyclicBarrier時設置一個count計數,當調用await()時做兩件事:①將count-1 ②將線程阻塞並構造成結點加入condition條件隊列。

當count變為0時,達到等待線程數量要求,condition將條件隊列中的線程全部喚醒。

3.1 類結構

public class CyclicBarrier {

private static class Generation { // 內部類,當有parties個線程到達barrier就會更新換代

boolean broken = false; // 是否損壞

}

private final ReentrantLock lock = new ReentrantLock(); // 重入鎖

private final Condition trip = lock.newCondition();

private final int parties; // 等待線程總數量

private final Runnable barrierCommand; // 達到等待線程數量後執行的線程

private Generation generation = new Generation(); // 當有parties個線程到達barrier,就會更新換代

private int count; // 記錄當前線程數量

}

3.2 構造方法

將parties設置為count值,設置達到等待線程數量後優先執行的線程

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw new IllegalArgumentException();

this.parties = parties; // 保存parties可循環使用

this.count = parties; // 將parties設置為count值

this.barrierCommand = barrierAction;// 設置達到等待線程數量後優先執行的線程

}

3.3 await()

await()方法:

①將count-1

②將線程阻塞並構造成結點加入condition條件隊列。

③當count變為0時,condition將條件隊列中的線程全部喚醒。

public int await() throws InterruptedException, BrokenBarrierException {

try {

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen

}

}

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier(); // 代失效,喚醒所有線程

throw new InterruptedException();

}

int index = --count; // 計數

if (index == 0) { // 達到要求數量

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run(); // 達到等待線程數量後執行barrierCommand

ranAction = true;

nextGeneration(); // 喚醒本代所有線程,生成新一代,重置count

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// 線程數量未達到要求數量,將線程掛起等待

for (;;) {

try {

if (!timed)

trip.await(); // 將線程加入condition隊列掛起

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && !g.broken) {

breakBarrier();

throw ie;

} else {

Thread.currentThread().interrupt();

}

}

// 特殊情況處理

if (g.broken)

throw new BrokenBarrierException();

if (g != generation)

return index;

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

// 當前代失效,喚醒所有線程

private void breakBarrier() {

generation.broken = true;

count = parties;

trip.signalAll();

}

// 喚醒本代所有線程,生成新一代,重置count

private void nextGeneration() {

trip.signalAll();

count = parties;

generation = new Generation();

}

4. 總結

CyclicBarrier要做的事情是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。

注意CyclicBarrier與CountDownLatch的區別:CountDownLatch用於一個線程等待若干個其他線程執行完任務之後才執行,而CyclicBarrier強調的是多個線程互等,等大家都完成,再攜手共進。此外,CyclicBarrier功能更加強大,可以循環使用。

CyclicBarrier是通過Lock的Condition實現的,每個CyclicBarrier對應個Lock鎖和該鎖的condition條件。創建CyclicBarrier時設置一個count計數,當調用await()時做兩件事:①將count-1 ②將線程阻塞並構造成結點加入condition條件隊列。當count變為0時,達到等待線程數量要求,condition將條件隊列中的線程全部喚醒。

相關焦點

  • Java並發編程:CountDownLatch、CyclicBarrier和Semaphore
    在java 1.5中,提供了一些非常有用的輔助類來幫助我們進行並發編程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我們就來學習一下這三個輔助類的用法。我們暫且把這個狀態就叫做barrier,當調用await()方法之後,線程就處於barrier了。
  • java並發編程之深入學習Concurrent包(七,CyclicBarrier類)
    引言:CyclicBarrier,就是循環的柵欄。柵欄,就是用來阻攔線程,通過它可以實現讓一些線程等待,直到到達約定的狀態後線程再同時開始執行。CyclicBarrier是可以被重用的。CyclicBarrier如下圖所示:
  • 原創】Java並發編程系列01|開篇獲獎感言
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫為什麼要學並發編程我曾聽一個從事15年開發工作的技術人員說過,他剛工作時的並發編程第一原則就是不要寫並發程序。
  • 項目實踐,使用Cyclic Barrier在多線程中設置屏障
    Cyclic Barrier的使用場景是, 每個線程在執行時, 都會碰到屏障, 該屏障會攔截所有線程的執行(通過await() 方法實現) ;當指定數量的線程全部就位時,所有的線程再跨過屏障同時執行。現舉例說明CountDown Latch和Cyclic Barrier的區別。假設有A、B、C 3個線程, 其中C是最後一個加人的線程。
  • 「原創」Java並發編程系列09|基礎乾貨
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第9篇。現在,我們進入正題:介紹並發編程的基礎性概念。
  • JAVA並發編程:並發問題的根源及主要解決方法
    而在java中,不變性變量即通過final修飾的變量,如String,Long,Double等類型都是Immutability的,它們的內部實現都是基於final關鍵字的。那這又和並發編程有什麼關係呢?其實啊,並發問題很大部分原因就是因為線程切換破壞了原子性,這又導致線程隨意對變量的讀寫破壞了數據的一致性。
  • 「原創」Java並發編程系列02|並發編程三大核心問題
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫寫在前面編寫並發程序是比較困難的,因為並發程序極易出現Bug,這些Bug有都是比較詭異的,很多都是沒辦法追蹤,而且難以復現。
  • 「原創」Java並發編程系列06|你不知道的final
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫final在Java中是一個保留的關鍵字,可以修飾變量、方法和類。那麼fianl在並發編程中有什麼作用呢?本文就在對final常見應用總結基礎上,講解final並發編程中的應用。
  • 「原創」Java並發編程系列07|synchronized原理
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫並發編程中用到最多的關鍵字毫無疑問是synchronized。這篇文章就來探究下synchronized:synchronized如何使用?
  • 詳解java中的同步工具類CyclicBarrier
    之前介紹了java中另一個同步工具類CountDownLatch,這篇文章主要介紹CyclicBarrier。一、概念理解CyclicBarrier允許一組線程在到達某個柵欄點(common barrier point)互相等待,直到最後一個線程到達柵欄點,柵欄才會打開,處於阻塞狀態的線程恢復繼續執行。就比如說我們在打王者的時候,十個人必須全部加載到100%,才可以開局。否則只要有一個人沒有加載到100%,那這個遊戲就不能開始。
  • Java並發編程系列20|StampedLock源碼解析
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 20 篇,文末有本系列文章匯總。上一篇介紹了StampedLock存在的意義以及如何使用,按照這個系列的風格大家也應該猜到了,這一篇就是的源碼分析。
  • 「原創」Java並發編程系列14|AQS源碼分析
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 14 篇,文末有本系列文章匯總。AbstractQueuedSynchronizer是Java並發包java.util.concurrent的核心基礎組件,是實現Lock的基礎。
  • 教你正確使用Java高並發Barrier
    什麼是高並發Barrier?在正確使用Barrier之前,你需要知道什麼是高並發Barrier。Barrier,顧名思義,就是屏障的意思。在Java中,並發屏障意思就是等待執行任務的多個線程均執行到指定的點,然後在統一往下執行。
  • JAVA高並發網絡編程之BIO堵塞網絡編程
    上次說了網絡編程都是有作業系統統一的API的,每個語言有對它的實現,這次來一起說說通過java原生的socket編程完成BIO的網絡編程。
  • Java並發包CountDownLatch、CyclicBarrier、Semaphore原理解析
    二、CyclicBarrier2.1、CyclicBarrier的使用CyclicBarrier可以理解為一個循環同步屏障,定義一個同步屏障之後,當一組線程都全部達到同步屏障之前都會被阻塞,直到最後一個線程達到了同步屏障之後才會被打開,其他線程才可繼續執行。
  • 「原創」Java並發編程系列18|讀寫鎖(下)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 18 篇,文末有本系列文章匯總。上篇為【原創】Java並發編程系列17 | 讀寫鎖八講(上),沒看過的可以先看看。本文是下篇,從「源碼分析寫鎖的獲取與釋放」開始。7.
  • Java中的並發工具類CountDownLatch
    Java中的並發工具類在多線程編程的時候,有時候需要控制並發流,Java本身提供了幾個控制並發的工具類,比如CountDownLatch,CyclicBarrier,Semaphore2、CyclicBarrier同步屏障。可以循環使用的屏障,讓一組線程到達一個屏障時被阻塞,等到最後一個線程到達屏障時才會開門。
  • 大數據入門:Java和Scala編程對比
    在學習大數據之初,很多人都會對程式語言的學習有疑問,比如說大數據編程主要用什麼語言,在實際運用當中,大數據主流編程是Java,但是涉及到Spark、Kafka框架,還需要懂Scala。今天的大數據入門分享,我們就來對Java和Scala這兩門語言的編程做個對比。
  • 「原創」Java並發編程系列33|深入理解線程池(上)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫並發編程必不可少的線程池,接下來分兩篇文章介紹線程池,本文是第一篇。介紹1.1 使用場景並發編程可以高效利用CPU資源,提升任務執行效率,但是多線程及線程間的切換也伴隨著資源的消耗。當遇到單個任務處理時間比較短,但需要處理的任務數量很大時,線程會頻繁的創建銷毀,大量的時間和資源都會浪費在線程的創建和銷毀上,效率很低。
  • 「原創」Java並發編程系列26|ConcurrentHashMap(上)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫終於輪到ConcurrentHashMap了,並發編程必備,也是面試必備。HashMap 作為使用最頻繁的集合之一,在多線程環境下是不能用的,因為 HashMap 的設計上就沒有考慮並發環境,極易導致線程安全問題。為了解決該問題,提供了 Hashtable 和 Collections.synchronizedMap(hashMap)兩種解決方案,但是這兩種方案都是對讀寫加獨佔鎖,一個線程在讀時其他線程必須等待,吞吐量和性能都較低。