「原創」Java並發編程系列14|AQS源碼分析

2020-12-22 酷扯兒

本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫

本文為何適原創並發編程系列第 14 篇,文末有本系列文章匯總。

AbstractQueuedSynchronizer是Java並發包

java.util.concurrent

的核心基礎組件,是實現Lock的基礎。

AQS實現了對同步狀態的管理,以及對阻塞線程進行排隊、等待通知等,本文將從源碼角度深入理解AQS的實現原理。

建議:本文涉及大量源碼,在源碼中加了很多詳細的注釋,用電腦閱讀會更方便。

1. AQS類結構

屬性

// 屬性private transient volatile Node head;// 同步隊列頭節點private transient volatile Node tail;// 同步隊列尾節點private volatile int state;// 當前鎖的狀態:0代表沒有被佔用,大於0代表鎖已被線程佔用(鎖可以重入,每次重入都+1)private transient Thread exclusiveOwnerThread; // 繼承自AbstractOwnableSynchronizer 持有當前鎖的線程

方法

// 鎖狀態getState()// 返回同步狀態的當前值;setState(int newState)// 設置當前同步狀態;compareAndSetState(int expect, int update)// 使用CAS設置當前狀態,保證狀態設置的原子性;// 獨佔鎖acquire(int arg)// 獨佔式獲取同步狀態,如果獲取失敗則插入同步隊列進行等待;acquireInterruptibly(int arg)// 與acquire(int arg)相同,但是該方法響應中斷;tryAcquireNanos(int arg,long nanos)// 在acquireInterruptibly基礎上增加了超時等待功能,在超時時間內沒有獲得同步狀態返回false;release(int arg)// 獨佔式釋放同步狀態,該方法會在釋放同步狀態之後,將同步隊列中頭節點的下一個節點包含的線程喚醒;// 共享鎖acquireShared(int arg)// 共享式獲取同步狀態,與獨佔式的區別在於同一時刻有多個線程獲取同步狀態;acquireSharedInterruptibly(int arg)// 在acquireShared方法基礎上增加了能響應中斷的功能;tryAcquireSharedNanos(int arg, long nanosTimeout)// 在acquireSharedInterruptibly基礎上增加了超時等待的功能;releaseShared(int arg)// 共享式釋放同步狀態;// AQS使用模板方法設計模式// 模板方法,需要子類實現獲取鎖/釋放鎖的方法tryAcquire(int arg)// 獨佔式獲取同步狀態;tryRelease(int arg)// 獨佔式釋放同步狀態;tryAcquireShared(int arg)// 共享式獲取同步狀態;tryReleaseShared(int arg)// 共享式釋放同步狀態;

內部類

// 同步隊列的節點類static final class Node {}

2. 同步隊列

AQS通過內置的FIFO同步隊列來完成資源獲取線程的排隊工作。

如果當前線程獲取鎖失敗時,AQS會將當前線程以及等待狀態等信息構造成一個節點(Node)並將其加入同步隊列,同時會park當前線程;當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。

隊列結構

同步隊列由雙向鍊表實現,AQS持有頭尾指針(head/tail屬性)來管理同步隊列。

節點的數據結構,即AQS的靜態內部類Node,包括節點對應的線程、節點的等待狀態等信息。

節點類:

static final class Node {volatile Node prev;// 當前節點/線程的前驅節點 volatile Node next;// 當前節點/線程的後繼節點 volatile Thread thread;// 每一個節點對應一個線程 volatile int waitStatus;// 節點狀態 static final int CANCELLED = 1;// 節點狀態:此線程取消了爭搶這個鎖 static final int SIGNAL = -1;// 節點狀態:當前node的後繼節點對應的線程需要被喚醒(表示後繼節點的狀態) static final int CONDITION = -2;// 節點狀態:當前節點進入等待隊列中 static final int PROPAGATE = -3;// 節點狀態:表示下一次共享式同步狀態獲取將會無條件傳播下去 Node nextWaiter;// 共享模式/獨佔模式 static final Node SHARED = new Node();// 共享模式 static final Node EXCLUSIVE = null;// 獨佔模式}

入隊操作

/*** 1.線程搶鎖失敗後,封裝成node加入隊列 * 2.隊列有tail,可直接入隊。 * 2.1入隊時,通過CAS將node置為tail。CAS操作失敗,說明被其它線程搶先入隊了,node需要通過enq()方法入隊。 * 3.隊列沒有tail,說明隊列是空的,node通過enq()方法入隊,enq()會初始化head和tail。 */private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);// 線程搶鎖失敗後,封裝成node加入隊列 Node pred = tail; if (pred != null) {// 如果有tail,node加入隊尾 node.prev = pred; if (compareAndSetTail(pred, node)) {// 通過CAS將node置為tail。CAS操作失敗,說明被其它線程搶先入隊了,node需要通過enq()方法入隊。 pred.next = node; return node; } } enq(node);// 如果沒有tail,node通過enq()方法入隊。 return node;}/** * 1.通過自旋的方式將node入隊,只有node入隊成功才返回,否則一直循環。 * 2.如果隊列為空,初始化head/tail,初始化之後再次循環到else分支,將node入隊。 * 3.node入隊時,通過CAS將node置為tail。CAS操作失敗,說明被其它線程搶先入隊了,自旋,直到成功。 */private Node enq(final Node node) { for (;;) {// 自旋:循環入列,直到成功 Node t = tail; if (t == null) { // 初始化head/tail,初始化之後再次循環到else分支,將node入隊 if (compareAndSetHead(new Node())) tail = head; } else { // node入隊 node.prev = t; if (compareAndSetTail(t, node)) {// 通過CAS將node置為tail。操作失敗,說明被其它線程搶先入隊了,自旋,直到成功。 t.next = node; return t; } } }}

3. 獲取鎖

以獨佔鎖為例詳細講解獲取鎖及排隊等待的過程。直接在代碼中加了詳細的注釋講解,耐心看一定可以看懂。

/*** 1.當前線程通過tryAcquire()方法搶鎖。 * 2.線程搶到鎖,tryAcquire()返回true,結束。 * 3.線程沒有搶到鎖,addWaiter()方法將當前線程封裝成node加入同步隊列,並將node交由acquireQueued()處理。 */public final void acquire(int arg) { if (!tryAcquire(arg) && // 子類的搶鎖操作,下文有解釋 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 子類搶鎖失敗進入隊列中,重點方法,下文詳細講解 selfInterrupt();}/** * 需要子類實現的搶鎖的方法 * 目前可以理解為通過CAS修改state的值,成功即為搶到鎖,返回true;否則返回false。 * 之後重入鎖ReentrantLock、讀寫鎖ReentrantReadWriteLock中會詳細講解。 */protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}/** * 上文介紹過的入隊操作,線程搶鎖失敗,將當前線程封裝成node加入同步隊列,並返回node * Node.EXCLUSIVE-表示獨佔鎖,先不用關注 */addWaiter(Node.EXCLUSIVE)/** * 重點方法!! * 1.只有head的後繼節點能去搶鎖,一旦搶到鎖舊head節點從隊列中刪除,next被置為新head節點。 * 2.如果node線程沒有獲取到鎖,將node線程掛起。 * 3.鎖釋放時head節點的後繼節點喚醒,喚醒之後繼續for循環搶鎖。 */final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {// 注意這裡是循環 /* * 1.node的前置節點是head時,可以調用tryAcquire()嘗試去獲取鎖,獲取鎖成功則將node置為head * 注意:只有head的後繼節點能去搶鎖,一旦搶到鎖舊head節點從隊列中刪除,next被置為新head節點 * 2.node線程沒有獲取到鎖,繼續執行下面另一個if的代碼 * 此時有兩種情況:1)node不是head的後繼節點,沒有資格搶鎖;2)node是head的後繼節點但搶鎖沒成功 */ final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } /* * shouldParkAfterFailedAcquire(p, node):通過前置節點pred的狀態waitStatus 來判斷是否可以將node節點線程掛起 * parkAndCheckInterrupt():將當前線程掛起 * 1.如果node前置節點p.waitStatus==Node.SIGNAL(-1),直接將當前線程掛起,等待喚醒。 * 鎖釋放時會將head節點的後繼節點喚醒,喚醒之後繼續for循環搶鎖。 * 2.如果node前置節點p.waitStatus<=0但是不等於-1, * 1)shouldParkAfterFailedAcquire(p, node)會將p.waitStatus置為-1,並返回false; * 2)進入一下次for循環,先嘗試搶鎖,沒獲取到鎖則又到這裡,此時p.waitStatus==-1,就會掛起當前線程。 * 3.如果node前置節點p.waitStatus>0, * 1)shouldParkAfterFailedAcquire(p, node)為node找一個waitStatus<=0的前置節點,並返回false; * 2)繼續for循環 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}/** * 通過前置節點pred的狀態waitStatus 來判斷是否可以將node節點線程掛起 * pred.waitStatus==Node.SIGNAL(-1)時,返回true表示可以掛起node線程,否則返回false * @param pred node的前置節點 * @param node 當前線程節點 */private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { /* * waitStatus>0 ,表示節點取消了排隊 * 這裡檢測一下,將不需要排隊的線程從隊列中刪除(因為同步隊列中保存的是等鎖的線程) * 為node找一個waitStatus<=0的前置節點pred */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 此時pred.waitStatus<=0但是不等於-1,那麼將pred.waitStatus置為Node.SIGNAL(-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}/** * 將當前線程掛起 * LockSupport.park()掛起當前線程;LockSupport.unpark(thread)喚醒線程thread */private final boolean parkAndCheckInterrupt() { LockSupport.park(this);// 將當前線程掛起 return Thread.interrupted();}

AQS獲取鎖

4. 釋放鎖

/*** 釋放鎖之後,喚醒head的後繼節點next。 * 回顧上文講的acquireQueued()方法,next節點會進入for循環的下一次循環去搶鎖 */public final boolean release(int arg) { if (tryRelease(arg)) {// 子類實現的釋放鎖的方法,下文有講解 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);// 喚醒node節點(也就是head)的後繼節點,下文有講解 return true; } return false;}/** * 需要子類實現的釋放鎖的方法,對應於tryAcquire() * 目前可以理解為將state的值置為0。 * 之後重入鎖ReentrantLock、讀寫鎖ReentrantReadWriteLock中會詳細講解。 */protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}/** * 喚醒node節點(也就是head)的後繼節點 */private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next;// 正常情況,s就是head.next節點 /* * 有可能head.next取消了等待(waitStatus==1) * 那麼就從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的去喚醒 */ if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);// 喚醒s節點的線程去搶鎖}

5. 回顧整個過程

線程1來獲取鎖,此時沒有競爭,直接獲取到鎖。AQS隊列為空。線程2來獲取鎖,因為線程1佔用鎖,線程2需要做兩件事:1)線程2構造成Node到AQS的同步隊列中排隊。此時初始化同步隊列。

2)線程2阻塞,等待被喚醒之後再去搶鎖。

線程3來獲取鎖,鎖被佔用,同樣做兩件事:排隊並阻塞。此時的同步隊列結構:

線程1執行完同步代碼之後釋放鎖,喚醒head的後繼節點(線程2),線程2獲取鎖,並把線程2對應的Node置為head。線程2執行完同步代碼之後釋放鎖,喚醒head的後繼節點(線程3),線程3獲取鎖,並把線程3對應的Node置為head。線程3執行完同步代碼之後釋放鎖,同步隊列中head之後沒有節點了,將head置為null即可。總結

AQS結構:鎖狀態

state

、當前只有鎖的線程

exclusiveOwnerThread

以及雙向鍊表實現的同步隊列。

AQS使用模板方法設計模式,子類必須重寫AQS獲取鎖

tryAcquire()

和釋放鎖

tryRelease()

的方法,一般是對

state

exclusiveOwnerThread

的操作。

獲取鎖

acquire()

過程:

子類調用tryAcquire()嘗試獲取鎖,如果獲取鎖成功,完成。如果獲取鎖失敗,當前線程會封裝成Node節點插入同步隊列中,並且將當前線程park()阻塞,等待被喚醒之後再搶鎖。釋放鎖

release()

過程:當前線程調用子類的

tryRelease()

方法釋放鎖,釋放鎖成功後,會

unpark(thread)

喚醒

head

的後繼節點,讓其再去搶鎖。

相關焦點

  • 「原創」Java並發編程系列28|Copy-On-Write容器
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫正文前面兩篇講了並發編程中線程安全HashMap:ConcurrentHashMap
  • 「原創」Java並發編程系列18|讀寫鎖(下)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 18 篇,文末有本系列文章匯總。>記錄讀寫鎖狀態源碼分析讀鎖的獲取與釋放源碼分析寫鎖的獲取與釋放鎖降級讀寫鎖應用本文涉及到上下文聯繫較多,經常需要上下滑動查看
  • 原創】Java並發編程系列01|開篇獲獎感言
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫為什麼要學並發編程我曾聽一個從事15年開發工作的技術人員說過,他剛工作時的並發編程第一原則就是不要寫並發程序。
  • 「原創」Java並發編程系列09|基礎乾貨
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第9篇。現在,我們進入正題:介紹並發編程的基礎性概念。
  • 「原創」Java並發編程系列02|並發編程三大核心問題
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫寫在前面編寫並發程序是比較困難的,因為並發程序極易出現Bug,這些Bug有都是比較詭異的,很多都是沒辦法追蹤,而且難以復現。
  • Java並發編程系列20|StampedLock源碼解析
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第 20 篇,文末有本系列文章匯總。上一篇介紹了StampedLock存在的意義以及如何使用,按照這個系列的風格大家也應該猜到了,這一篇就是的源碼分析。
  • 「原創」Java並發編程系列06|你不知道的final
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫final在Java中是一個保留的關鍵字,可以修飾變量、方法和類。那麼fianl在並發編程中有什麼作用呢?本文就在對final常見應用總結基礎上,講解final並發編程中的應用。
  • 「原創」Java並發編程系列03|重排序-可見性和有序性問題根源
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫思維導圖寫在前面並發編程的三大問題:原子性、可見性、有序性。從java原始碼到最終實際執行的指令序列,會分別經歷下面三種重排序:編譯器優化的重排序。編譯器在不改變單線程程序語義的前提下,可以重新安排語句的執行順序。指令級並行的重排序。
  • 「原創」Java並發編程系列29|ConcurrentLinkedQueue
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫J.U.C 為常用的集合提供了並發安全的版本,前面講解了 map 的並發安全集合 ConcurrentHashMap,List 並發安全集合 CopyOnWriteArrayList,Set 並發安全集合
  • JAVA並發編程:並發問題的根源及主要解決方法
    而在java中,不變性變量即通過final修飾的變量,如String,Long,Double等類型都是Immutability的,它們的內部實現都是基於final關鍵字的。那這又和並發編程有什麼關係呢?其實啊,並發問題很大部分原因就是因為線程切換破壞了原子性,這又導致線程隨意對變量的讀寫破壞了數據的一致性。
  • Java並發編程系列21|Condition-Lock的等待通知
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫我們知道 synchronized 鎖通過 Object 類的 wait()和 notify()方法實現線程間的等待通知機制,而比 synchronized 更靈活 Lock 鎖同樣也有實現等待通知機制的方式
  • 「原創」Java並發編程系列33|深入理解線程池(上)
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫並發編程必不可少的線程池,接下來分兩篇文章介紹線程池,本文是第一篇。介紹1.1 使用場景並發編程可以高效利用CPU資源,提升任務執行效率,但是多線程及線程間的切換也伴隨著資源的消耗。當遇到單個任務處理時間比較短,但需要處理的任務數量很大時,線程會頻繁的創建銷毀,大量的時間和資源都會浪費在線程的創建和銷毀上,效率很低。
  • 「原創」Java並發編程系列36|FutureTask
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫線程池源碼中出現了很多Callable、Future、FutureTask等以前沒介紹過的接口,尤其是線程池提交任務時總是把任務封裝成FutureTask,今天就來為大家解惑:
  • NO.001- 簡說 Java 並發編程史
    這篇文章是Java並發編程思想系列的第一篇,主要從理解Java並發編程歷史的原因和Java並發演進過程兩部分,以極簡地回溯並發編程的歷史,幫助大家從歷史這個角度去了解一門語言一個特性的演進。對歷史理解的越多,思考的越多,未來的方向就會更加堅定。我是誰?從哪來?到哪去?
  • Java並發編程系列23|循環屏障CyclicBarrier
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本篇介紹第二個並發工具類CyclicBarrier,CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier),分以下部分介紹:
  • java集合之LinkedList源碼分析
    一、查看類的繼承關係關於Cloneable,java.io.Serializable兩個接口和為什麼沒有實現RandomAccess接口,我們上次已經說過了,如果不是很理解,請參考上一篇文章:《java集合之ArrayList源碼分析》這次重點把LinkedList源碼看一遍先簡單說明:LinkedList 內部是一個鍊表LinkedList首先會繼承AbstractSequentialList
  • Java並發工具三劍客之CountDownLatch源碼解析
    CountDownLatch是Java並發包下的一個工具類,latch是門閂的意思,顧名思義,CountDownLatch就是有一個門閂擋住了裡面的人(線程)出來,當count減到0的時候,門閂就打開了,人(線程)就可以出來了。
  • 編程貓「kitten源碼編輯器」0到1的關鍵點設計
    當前關於產品分析類文章有兩大主流分析視角,一類是著重宏觀視角,不做過多產品細節分析;另一類是較為模板化的產品分析,從行業分析到功能點體驗拆分基本全部囊括,功能點拆分較為全面細緻。我們今天不從宏觀角度來分析行業格局以及編程貓的發展歷程,我將站在產品視角上,從頭拆解編程貓的第一款產品 ——「kitten源碼編輯器」,試圖回答這樣一個問題:這樣一款面向少兒用戶的工具類產品,面對產品0到1過程中的關鍵環節,如果是我會怎樣思考和規劃?關鍵點1:為什麼要花巨大的成本來開發少兒編程工具?
  • Github上星標85.7k的並發編程神筆記也太香了吧!
    並發編程並發編程可選擇的方式有多進程、多線程和多協程。對於Java來說,它既不像C++那樣,在運行中調用Linux的系統API去"fork" 出多個進程:也不像Go那樣,在語言層面原生提供多協程。在Java中, 並發就是多線程模式。對於人腦的認知來說,「代碼一行行串行」當然最容易理解。但在多線程下,多個線程的代碼交叉並行,要訪問互斥資源,要互相通信。作為開發者,需要仔細設計線程之間的互斥與同步,稍不留心,就會寫出非線程安全的代碼。正因此,多線程編程一直是一個被廣泛而深入討論的領域。
  • 超硬核的Java學習路線指南,看完以後不用再問我怎麼學Java了!
    我們都知道編程技術語言很多,如當下比較火的程式語言就有java,python,javascript,php等語言,而今天我們就來講一講熱門程式語言Java,因為現階段我們的java程序還是很厲害的,不管是大型項目、高並發上億的數據量還是操作小項目,其穩定性,安全性都是數一數二的,非常nice!!