Java並發編程系列21|Condition-Lock的等待通知

2020-12-09 酷扯兒

本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫

我們知道 synchronized 鎖通過 Object 類的 wait()和 notify()方法實現線程間的等待通知機制,而比 synchronized 更靈活 Lock 鎖同樣也有實現等待通知機制的方式,那就是條件 Condition。本文將從以下幾個方面介紹 Condition:

如何使用 Condition源碼分析Condition 的應用場景1. Condition 的使用

1.1 Condition 類提供的方法

等待方法:

// 當前線程進入等待狀態,如果其他線程調用 condition 的 signal 或者 signalAll 方法並且當前線程獲取 Lock 從 await 方法返回,如果在等待狀態中被中斷會拋出被中斷異常void await() throws InterruptedException// 當前線程進入等待狀態直到被通知,中斷或者超時long awaitNanos(long nanosTimeout)// 同第二個方法,支持自定義時間單位boolean await(long time, TimeUnit unit)throws InterruptedException// 當前線程進入等待狀態直到被通知,中斷或者到了某個時間boolean awaitUntil(Date deadline) throws InterruptedException

喚醒方法:

// 喚醒一個等待在 condition 上的線程,將該線程從等待隊列中轉移到同步隊列中,如果在同步隊列中能夠競爭到 Lock 則可以從等待方法中返回void signal()// 與 1 的區別在於能夠喚醒所有等待在 condition 上的線程void signalAll()

1.2 使用舉例

啟動 waiter 和 signaler 兩個線程。waiter 線程獲取到鎖,檢查 flag=false 不滿足條件,執行 condition.await()方法將線程阻塞等待並釋放鎖。signaler 線程獲取到鎖之後更改條件,將 flag 變為 true,執行 condition.signalAll()通知喚醒等待線程,釋放鎖。waiter 線程被喚醒獲取到鎖,自旋檢查 flag=true 滿足條件,繼續執行。

public class ConditionTest {private static ReentrantLock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); private static volatile boolean flag = false; public static void main(String[] args) { Thread waiter = new Thread(new waiter()); waiter.start(); Thread signaler = new Thread(new signaler()); signaler.start(); } static class waiter implements Runnable { @Override public void run() { lock.lock(); try { while (!flag) { System.out.println(Thread.currentThread().getName() + "當前條件不滿足等待"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "接收到通知條件滿足"); } finally { lock.unlock(); } } } static class signaler implements Runnable { @Override public void run() { lock.lock(); try { flag = true; condition.signalAll(); } finally { lock.unlock(); } } }}

輸出結果:

Thread-0當前條件不滿足等待Thread-0接收到通知,條件滿足

2. Condition 與 wait/notify

Object 的 wait 和 notify/notify 是與 synchronized 配合完成線程間的等待/通知機制,是屬於 Java 底層級別的。而 Condition 是語言級別的,具有更高的可控制性和擴展性。具體表現如下:

wait/notify 方式是響應中斷的,當線程處於 Object.wait()的等待狀態中,線程中斷會拋出中斷異常;Condition 有響應中斷和不響應中斷模式可以選擇。wait/notify 方式一個 synchronized 鎖只有一個等待隊列;一個 Lock 鎖可以根據不同的條件,new 多個 Condition 對象,每個對象包含一個等待隊列。需要注意的是,Condition 同 wait/notify 一樣,在等待與喚醒方法使用之前必須獲取到該鎖。

3. 源碼分析

Tips:需要在理解 AQS 及 ReentrantLock 基礎上閱讀本文源碼,給出這兩篇的連結:

【原創】14|AQS 源碼分析【原創】15|重入鎖 ReentrantLock

3.1 條件隊列

首先看 Condition 對象的創建:

ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();public Condition newCondition() {return sync.newCondition();}final ConditionObject newCondition() { return new ConditionObject();}

創建的 Condition 對象其實就是 ConditionObject 對象,ConditionObject 是 AbstractQueuedSynchronizer(AQS)的內部類,實現了 Condition 接口。

每個 ConditionObject 對象都有一個條件等待隊列,用於保存在該 Condition 對象上等待的線程

。條件等待隊列是一個單向鍊表,結點用的AQS的Node類,每個結點包含線程、next結點、結點狀態。ConditionObject通過持有頭尾指針類管理條件隊列。

注意區分 AQS 的同步隊列和 Condition 的條件隊列。線程搶鎖失敗時進入 AQS 同步隊列,AQS 同步隊列中的線程都是等待著隨時準備搶鎖的。線程因為沒有滿足某一條件而調用 condition.await()方法之後進入 Condition 條件隊列,Condition 條件隊列中的線程只能等著,沒有獲取鎖的機會。當條件滿足後調用 condition.signal()線程被喚醒,那麼線程就從 Condition 條件隊列移除,進入 AQS 同步隊列,被賦予搶鎖繼續執行的機會。

條件隊列源碼:

public class ConditionObject implements Condition, java.io.Serializable {private transient Node firstWaiter;// 頭結點 private transient Node lastWaiter;// 尾結點 /** * 入隊操作 */ private Node addConditionWaiter() { Node t = lastWaiter; // 如果尾結點取消等待了,將其清除出去,並檢查整個條件隊列將已取消的所有結點清除 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters();// 這個方法會遍歷整個條件隊列,然後會將已取消的所有結點清除出隊列 t = lastWaiter; } // 將當前線程構造成結點,加入隊尾 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node;// 維護尾結點指針 return node; } /** * 遍歷整個條件隊列,清除已取消等待的結點 */ private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null;// 用於保存前一個結點 while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { // t結點狀態不是Node.CONDITION,說明已經取消等待,刪除 t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t;// 下次循環中t結點的前一個結點 t = next; } }}static final class Node { volatile Thread thread;// 每一個節點對應一個線程 Node nextWaiter;// next結點 volatile int waitStatus;// 結點狀態 static final int CONDITION = -2;// 結點狀態:當前節點進入等待隊列中 ...}

3.2 await()

當調用 condition.await()方法後會使得線程進入到條件隊列,此時線程將被阻塞。當調用 condition.signal()方法後,線程從條件隊列進入 AQS 同步隊列排隊等鎖。線程在 AQS 中發生的事情這裡就不介紹了,不明白的可以看下以前 AQS 的文章【原創】14|AQS 源碼分析。

await()方法源碼:

/*** 當前線程被阻塞,並加入條件隊列 * 線程在AQS同步隊列中被喚醒後嘗試獲取鎖 */public final void await() throws InterruptedException { // 響應打斷 if (Thread.interrupted()) throw new InterruptedException(); // 將當前線程構造成結點,加入條件隊列隊尾,上文詳細分析了該方法 Node node = addConditionWaiter(); // 釋放鎖,線程阻塞前必須將鎖釋放,下文詳解fullyRelease()方法 int savedState = fullyRelease(node); int interruptMode = 0; /* * 1.isOnSyncQueue()檢查node是否在AQS同步隊列中,不在同步隊列中返回false,下文詳解isOnSyncQueue()方法 * 2.如果node不在AQS同步隊列中,將當前線程阻塞 * 3.當其他代碼調用signal()方法,線程進入AQS同步隊列後被喚醒,繼續從這裡阻塞的地方開始執行 * 4.注意這裡while循環的自旋,線程被喚醒以後還要再檢查一下node是否在AQS同步隊列中 */ while (!isOnSyncQueue(node)) { // 檢查node是否在AQS同步隊列中 LockSupport.park(this); // 阻塞,線程被喚醒後從這裡開始執行 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } /* * 到這裡,是當前線程在AQS同步隊列中被喚醒了,嘗試獲取鎖 * acquireQueued()方法搶鎖,搶不到鎖就在同步隊列中阻塞 * acquireQueued()方法是AQS文章中詳細重點講解過的這裡不詳細分析了 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);}

fullyRelease()方法:

/*** 將node線程的鎖全部釋放 * 「全部」是指多次重入的情況,這裡一次全部釋放 */final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState();// 鎖狀態 if (release(savedState)) {// 釋放鎖 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }}

isOnSyncQueue()方法:

/*** 檢查node是否在AQS同步隊列中,在同步隊列中返回true */final boolean isOnSyncQueue(Node node) { // 狀態為Node.CONDITION條件等待狀態,肯定是在條件隊列中,而不在同步隊列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果node已經有後繼節點next,那肯定是在同步隊列了 if (node.next != null) return true; // 遍歷同步隊列,查看是否有與node相等的結點 return findNodeFromTail(node);}/** * 從同步隊列的隊尾開始從後往前遍歷找,如果找到相等的,說明在同步隊列,否則就是不在同步隊列 */private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; }}

3.3 signal()

調用 condition.signal()方法後,線程從 Condition 條件隊列移除,進入 AQS 同步隊列排隊等鎖。

注意:正常情況下 signal 只是將線程從 Condition 條件隊列轉移到 AQS 同步隊列,並沒有喚醒線程。線程的喚醒時機是 AQS 中線程的前驅節點釋放鎖之後。

public final void signal() {// 驗證當前線程持有鎖才能調用該方法 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first);}/** * 從條件隊列隊頭往後遍歷,找出第一個需要轉移的結點node,將node從條件隊列轉移到AQS同步隊列 * 為什麼需要遍歷找?因為前有些線程會取消等待,但是可能還在條件隊列中 */private void doSignal(Node first) { do { // 將first中條件隊列中移除,將first的next結點作為頭結點賦值給firstWaiter if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; /* * transferForSignal()將first結點加入AQS同步隊列 * 如果first結點加入同步隊列失敗,是因為first結點取消了Node.CONDITION狀態,原因在下面transferForSignal()的講解中說明 * 如果first結點加入同步隊列失敗,那麼選擇first後面的第一個結點進行轉移,依此類推 */ } while (!transferForSignal(first) && // 將first結點加入AQS同步隊列 (first = firstWaiter) != null); // first結點加入同步隊列失敗,選擇first後面的結點進行轉移}/** * 將結點轉移到同步隊列 * @return true-代表成功轉移;false-代表在signal之前,節點已經取消等待了 */final boolean transferForSignal(Node node) { /* * CAS設置結點狀態 * CAS失敗說明此node的waitStatus已不是Node.CONDITION,說明節點已經取消。既然已經取消,也就不需要轉移了,方法返回,轉移後面一個節點 * CAS失敗為什麼不是其他線程搶先操作了呢?因為這裡還持有lock獨佔鎖,只有當前線程可以訪問。 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node);// 自旋進入同步隊列的隊尾 int ws = p.waitStatus; // 正常情況下不會走這裡,這裡是前驅節點取消或者 CAS 失敗的情況 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true;}static final class Node { volatile Thread thread;// 每一個結點對應一個線程 Node nextWaiter;// next結點 volatile int waitStatus;// 結點狀態 static final int CONDITION = -2;// 結點狀態:當前結點進入等待隊列中}

3.4 源碼過程總結

ReentrantLock lock = new ReentrantLock();創建 lock 鎖,對應生成 AQS 同步隊列,一個 ReentrantLock 鎖對應一個 AQS 同步隊列。Condition condition = lock.newCondition();創建 condition,對應生成 condition 條件隊列。線程 A 調用condition.await();,線程 A 阻塞並加入 condition 同步隊列。線程 B 調用condition.signal();,線程 A 阻塞從 condition1 同步隊列轉移到 AQS 同步隊列的隊尾。當 AQS 隊列中線程 A 的前驅節點線程執行完並釋放鎖時,將線程 A 喚醒。線程 A 被喚醒之後搶鎖,執行邏輯代碼。4. 應用

Condition 實現的生產者消費者問題。

class BoundedBuffer {final ReentrantLock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; // 生產 public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); // 隊列已滿,等待,直到 not full 才能繼續生產 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去 } finally { lock.unlock(); } } // 消費 public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); // 隊列為空,等待,直到隊列 not empty,才能繼續消費 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去 return x; } finally { lock.unlock(); } }}

生產者線程調用 put()方法向隊列中添加對象,當隊列滿時,生產者線程就阻塞等待。消費者線程調用 take()方法取出隊列中的對象,取出對象後隊列可以添加對象了,通知被阻塞的生產者線程。生產者線程被喚醒後,從阻塞的位置開始執行,繼續向隊列中添加對象。同樣,消費者取出隊列中對象時,發現隊列為空了也會阻塞等待,生產者線程添加對象之後會通知消費者線程。總結

Object 的 wait 和 notify/notify 是與 synchronized 配合完成線程間的等待/通知機制,而 Condition 與 Lock 配合完成等待通知機制。

Condition 比 wait 和 notify 具有更高的可控制性和擴展性,一個 Lock 鎖可以有多個 Condition 條件,此外 Condition 還有響應中斷和不響應中斷模式可以選擇。Condition 的使用與 wait/notify 一樣,在等待與喚醒方法使用之前必須獲取到鎖。

Condition 的實現原理:每個 condition 都有一個條件隊列,調用 condition.await()方法將線程阻塞後線程就進入了條件隊列,調用 condition.sigal()方法後線程從 condition 條件隊列轉移到 AQS 同步隊列等鎖,該線程的前一節點釋放鎖之後會喚醒該線程搶鎖執行。

Condition 多用於實現的生產者消費者問題。

相關焦點

  • 狂神說java之JUC並發編程(一)
    ();並發、並行並發編程:並發、並行並發(多線程操作同一個資源)CPU 一核 ,模擬出來多條線程,天下武功,唯快不破,快速交替並行(多個人一起行走)CPU 多核 ,多個線程可以同時執行;線程池public class Test1 {    public
  • 「原創」Java並發編程系列09|基礎乾貨
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第9篇。現在,我們進入正題:介紹並發編程的基礎性概念。
  • Java並發編程系列23|循環屏障CyclicBarrier
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本篇介紹第二個並發工具類CyclicBarrier,CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier),分以下部分介紹:
  • 「原創」Java並發編程系列28|Copy-On-Write容器
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫正文前面兩篇講了並發編程中線程安全HashMap:ConcurrentHashMap
  • Java並發編程系列20|StampedLock源碼解析
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 20 篇,文末有本系列文章匯總。上一篇介紹了StampedLock存在的意義以及如何使用,按照這個系列的風格大家也應該猜到了,這一篇就是的源碼分析。
  • C++並發condition_variable
    有意修改變量的線程必須獲得 std::mutex (常通過 std::lock_guard )在保有鎖時進行修改在 std::condition_variable 上執行 notifyone 或 notify_all (不需要為通知保有鎖)即使共享變量是原子的,也必須在互斥下修改它,以正確地發送修改至等待的線程。
  • 原創】Java並發編程系列01|開篇獲獎感言
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫為什麼要學並發編程我曾聽一個從事15年開發工作的技術人員說過,他剛工作時的並發編程第一原則就是不要寫並發程序。
  • Java多線程並發編程中並發容器第二篇之List的並發類講解
    Java多線程並發編程中並發容器第二篇之List的並發類講解概述本文我們將詳細講解list對應的並發容器以及用代碼來測試ArrayList、vector以及CopyOnWriteArrayList在100個線程向list中添加1000個數據後的比較
  • 「原創」Java並發編程系列18|讀寫鎖(下)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 18 篇,文末有本系列文章匯總。上篇為【原創】Java並發編程系列17 | 讀寫鎖八講(上),沒看過的可以先看看。本文是下篇,從「源碼分析寫鎖的獲取與釋放」開始。7.
  • java並發編程之深入學習Concurrent包(十三,雙端阻塞隊列)
    引言:春節疫情的關係,宅在家,有空學習一下java並發中的內容,順便發出來請大家一起瞅瞅。上一章學習了阻塞隊列,這次一起學習下雙端阻塞隊列。LinkedBlockingDeque簡介:LinkedBlockingDeque是雙向鍊表實現的雙端阻塞隊列。該阻塞隊列可以從隊列的頭和尾進行插入和刪除,且該阻塞隊列是線程安全的。
  • Java 面試寶典!並發編程 71 道題及答案全送上!
    而在java 5之後,可以使用阻塞隊列來實現,此方式大大簡少了代碼量,使得多線程編程更加容易,安全方面也有保障。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程後可以重用,所以稱它為循環 的 barrier。21、什麼是不可變對象,它對寫並發應用有什麼幫助?不可變對象即對象一旦被創建它的狀態就不能改變,反之即為可變對象。 不可變對象的類即為不可變類。
  • 「原創」Java並發編程系列02|並發編程三大核心問題
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫寫在前面編寫並發程序是比較困難的,因為並發程序極易出現Bug,這些Bug有都是比較詭異的,很多都是沒辦法追蹤,而且難以復現。
  • Java 並發編程之美-線程相關的基礎知識
    借用 Java 並發編程實踐中的話:編寫正確的程序並不容易,而編寫正常的並發程序就更難了;相比於順序執行的情況,多線程的線程安全問題是微妙而且出乎意料的
  • 30秒帶你讀懂Java並發包工具(JUC)之Condition(並發條件變量)
    了解ConditionJDK1.5中引入了java.util.concurrent.locks.Condition接口,用來替代wait/notify。wait/notify大家應該很了解,主要用來解決多線程的協調問題(等待/通知),但是其存在如下幾個問題:早喚醒問題:wait/notify是依賴Object+synchronized來實現,如果存在多個線程wait(),那麼通過notify()方法只能喚醒一個線程,而且這個線程不一定是我們想要喚醒的線程,導致運行錯誤。
  • Java並發編程之支持並發的list集合你知道嗎
    Java並發編程之-list集合的並發.我們都知道Java集合類中的arrayList是線程不安全的。那麼怎麼證明是線程不安全的呢?怎麼解決在並發環境下使用安全的list集合類呢?本篇是《凱哥(凱哥Java:kagejava)並發編程學習》系列之《並發集合系列》教程的第一篇:本文主要內容:怎麼證明arrayList不是線程安全的?怎麼解決這個問題?以及遇到問題解決的四個步驟及從源碼來分析作者思路。一:怎麼證明arrayList在並發情況下是線程不安全的呢?
  • Java中synchronized鎖和lock鎖的比較
    Java並發之顯式鎖和隱式鎖的區別在面試的過程中有可能會問到:在Java並發編程中,鎖有兩種實現:使用隱式鎖和使用顯示鎖分別是什麼?兩者的區別是什麼?所謂的顯式鎖和隱式鎖的區別也就是說說Synchronized(下文簡稱:sync)和lock(下文就用ReentrantLock來代之lock)的區別。
  • Java並發包下鎖學習第一篇:介紹及學習安排
    Java並發包下鎖學習第一篇:介紹及學習安排在Java並發編程中,實現鎖的方式有兩種,分別是:可以使用同步鎖(synchronized關鍵字的鎖),還有lock接口下的鎖。在這個系列中,我們將會學習並發包下鎖實現的原理(我們將跟著源碼來分析)、什麼是可重入鎖、公平鎖和非公平鎖怎麼定義的、為什麼synchronized關鍵字的鎖和ReentrantLock默認會選擇非公平鎖?讀寫鎖和獨佔鎖的比較、跟著源碼我們來分析讀寫鎖等和鎖相關的知識。學完這個系列教程後,大家將對並發鎖有更新的理解,歡迎大家一起學習。
  • JAVA並發編程:線程並發工具類Callable、Future 和FutureTask的使用
    get() 方法:獲取返回結果get(long timeout, TimeUnit unit) 方法:獲取結果時設置等待時長isCancelled() 方法:如果此任務在正常完成之前被取消,則返回{@code true}。isDone() 方法:如果此任務完成,則返回{@code true}。
  • Java並發編程之驗證volatile的可見性
    Java並發編程之驗證volatile的可見性通過系列文章的學習,凱哥已經介紹了volatile的三大特性。1:保證可見性 2:不保證原子性 3:保證順序。那麼怎麼來驗證可見性呢?本文凱哥將通過代碼演示來證明volatile的可見性。
  • 你知道JDK中Condition到底是個什麼東西麼?
    import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock; public class ConditionDemo {     static int data = 0;