本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫
本篇介紹第三個並發工具類Semaphore,Semaphore可以理解為信號量,用於控制資源能夠被並發訪問的線程數量,以保證多個線程能夠合理的使用特定資源。分以下部分介紹:
Semaphore的使用Semaphore源碼解析1. Semaphore的使用
Semaphore管理著一組許可permit,許可的初始數量通過構造函數設定。
當線程要訪問共享資源時,需要先通過acquire()方法獲取許可。獲取到之後許可就被當前線程佔用了,在歸還許可之前其他線程不能獲取這個許可。
調用acquire()方法時,如果沒有許可可用了,就將線程阻塞,等待有許可被歸還了再執行。
當執行完業務功能後,需要通過release()方法將許可證歸還,以便其他線程能夠獲得許可證繼續執行。
如果初始化了一個許可為1的Semaphore,那麼就相當於一個不可重入的互斥鎖(Mutex)。
舉個例子理解一下:
我們假設停車場僅有3個停車位,停車位就是有限的共享資源,許可數為3。一開始停車場沒有車輛所有車位全部空著,然後先後到來三輛車,停車場車位夠,安排進去停車。之後來的車必須在外面候著,直到停車場有空車位。當停車場有車開出去,裡面有空位了,則安排一輛車進去(至於是哪輛要看選擇的機制是公平還是非公平)。
從程序角度看,停車場就相當於有限的公共資源,許可數為3,車輛就相當於線程。當來一輛車時,許可數就會減1,當停車場沒有車位了(許可數為0),其他來的車輛需要在外面等候著。如果有一輛車開出停車場,許可數+1,然後放進來一輛車。
代碼實現如下:
public class SemaphoreTest {public static void main(String[] args) { Parking parking = new Parking(3);// 只能停3輛車的停車場 for (int i = 0; i < 8; i++) { // 每一個線程表示一輛車到停車場停車 new Thread() { public void run() { parking.park();// 進入停車場 }; }.start(); } } static class Parking { private Semaphore semaphore;// 信號量 Parking(int count) { semaphore = new Semaphore(count); } public void park() { try { semaphore.acquire();// 獲取許可 long time = (long) (Math.random() * 10); System.out.println(Thread.currentThread().getName() + "進入停車場,停車" + time + "秒..."); Thread.sleep(time);// 獲取許可 System.out.println(Thread.currentThread().getName() + "開出停車場..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } }}
輸出結果:
Thread-2進入停車場,停車5秒...Thread-1進入停車場,停車7秒...Thread-0進入停車場,停車3秒...Thread-0開出停車場...Thread-3進入停車場,停車6秒...Thread-2開出停車場...Thread-4進入停車場,停車3秒...Thread-1開出停車場...Thread-5進入停車場,停車7秒...Thread-4開出停車場...Thread-6進入停車場,停車8秒...Thread-3開出停車場...Thread-7進入停車場,停車0秒...Thread-7開出停車場...Thread-5開出停車場...Thread-6開出停車場...
Semaphore可以用於做流量控制,特別是公共資源有限的應用場景,比如資料庫連接。假如有多個線程讀取數據後,需要將數據保存在資料庫中,而可用的最大資料庫連接只有10個,這時候就需要使用Semaphore來控制能夠並發訪問到資料庫連接資源的線程個數最多只有10個。在限制資源使用的應用場景下,Semaphore是特別合適的。
2. 源碼分析
2.1 類結構
Semaphore同樣是由AQS實現的,用內部類Sync來管理鎖,Sync有兩個實現,分別為NonfairSync(非公平鎖)和FairSync(公平鎖)。
這個類結構有沒有似曾相識的感覺,重入鎖ReentrantLock也是同樣的類結構,Semaphore的源碼跟ReentrantLock有很多相似但又比ReentrantLock簡單。
public class Semaphore implements java.io.Serializable {private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync {} static final class FairSync extends Sync {}}
看下構造方法,設置許可數permits其實就是將AQS.state設置為permits。
public Semaphore(int permits) {sync = new NonfairSync(permits);}NonfairSync(int permits) { super(permits);}Sync(int permits) { setState(permits);}
2.2 acquire()
acquire()方法就是獲取許可,獲取到許可就可以繼續執行訪問共享資源,獲取不到就阻塞等待其他線程歸還許可。
AQS.state用來記錄可用的許可數量,每獲取一個許可state減1。
/*** 獲取許可的方法其實就是獲取鎖的方法 */public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 響應打斷 if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 真正獲取鎖的方法,由Semaphore.NonfairSync實現 doAcquireSharedInterruptibly(arg); // 獲取鎖失敗,當前線程阻塞並進入AQS同步隊列}/** * Semaphore.NonfairSync實現的獲取鎖的方法 */protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires);}/** * 每獲取一個許可,將state-1,state表示剩餘的許可數 * 如果許可已經用完,返回remaining<0,表示獲取不到鎖/許可,線程阻塞 * 如果還有許可,返回remaining>=0,表示獲取到鎖/許可,線程繼續執行 */final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires;// 每獲取一個許可,將state-1,state表示剩餘的許可數 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; }}
2.3 release()
release()方法歸還許可,其實就是將AQS.state加1。歸還成功,喚醒AQS隊列中等鎖的線程,從被阻塞的位置開始執行。
/*** 釋放許可調用釋放鎖的方法 */public void release() { sync.releaseShared(1);}/** * 釋放鎖,完全成功,依次喚醒AQS隊列中等待共享鎖的線程 */public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 釋放鎖,由Semaphore.Sync實現 doReleaseShared(); // 釋放鎖成功,喚醒AQS隊列中等鎖的線程 return true; } return false;}/** * 每歸還一個許可將state加1 */protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases;// 每歸還一個許可將state加1 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; }}
3. 總結
信號量Semaphore用於控制資源能夠被並發訪問的線程數量,以保證多個線程能夠合理的使用特定資源,比如資料庫連接等。
Semaphore在構造時設置一個許可數量,這個許可數量用AQS.state來記錄。
acquire()方法就是獲取許可,只有獲取到許可才可以繼續執行訪問共享資源,獲取到許可之後AQS.state減1,以記錄當前可用的許可數量;如果獲取不到許可,線程就阻塞等待其他線程歸還許可。
release()方法將許可歸還,AQS.state加1;歸還之後,喚醒AQS隊列中阻塞的線程獲取許可。