導讀:文章內容較多,也有不少代碼,但是作者寫的也很認真,對理解並發編程會有幫助,值得一讀。
閱讀完大約需要15分鐘,如果對 linux 實在不太感冒,也可以選擇性從 double-check 章節開始看起。
原文開始:
看 Linux 的 wait_event 源碼時,聯想到我們平時經常用得比較多的 wait/notify、double-check 和 volatile,突然意識 wait_event 簡簡單單幾行代碼的背後,涉及的知識點其實非常豐富。本篇文章我們就一起了來探索它背後的知識,然後嘗試著和我們的日常開發關聯起來。
wait_event這裡使用 Linux-2.6.24 版本的源碼
背景在某些情況下,我們會需要等待某個事件,在這個事件發生前,把進程投入睡眠。比方說,同步寫 IO;在發出寫磁碟命令後,進程要進入休眠,等等磁碟完成。為了支持這一類場景,Linux 引入了 wait queue;wait queue 從概念上跟我們應用層使用的 condition queue 是一樣的。
實現這裡我們著重講 wait_event 的實現,一些相關的知識讀者可以參考《深入理解LINUX內核》。
下面我們開始看代碼:
1
2
3
15#define wait_event(wq, condition) \
16do { \
17 if (condition) \
18 break; \
19 __wait_event(wq, condition); \
20} while (0)
這裡只是先檢測一遍條件,然後直接又調用 __wait_event:
1
2#define __wait_event(wq, condition) \
3do { \
4 DEFINE_WAIT(__wait); \
5
6 for (;;) { \
7 prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); \
8 if (condition) \
9 break; \
10
11
12 schedule(); \
13 } \
14 finish_wait(&wq, &__wait); \
15} while (0)
DEFINE_WAIT 宏用於定義局部變量 __wait:
1
2#define DEFINE_WAIT(name) \
3 wait_queue_t name = { \
4 .private = current, \
5 .func = autoremove_wake_function, \
6 .task_list = LIST_HEAD_INIT((name).task_list), \
7 }
prepare_to_wait 和 finish_wait 源碼如下:
1
2
14void fastcall
15prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
16{
17 unsigned long flags;
18
19
20 wait->flags &= ~WQ_FLAG_EXCLUSIVE;
21
22 spin_lock_irqsave(&q->lock, flags);
23
24
25 if (list_empty(&wait->task_list))
26 __add_wait_queue(q, wait);
27
31
32 if (is_sync_wait(wait))
33
34
35 set_current_state(state);
36
37 spin_unlock_irqrestore(&q->lock, flags);
38}
39
40
47#define is_sync_wait(wait) (!(wait) || ((wait)->private))
48
49void fastcall finish_wait(wait_queue_head_t *q, wait_queue_t *wait)
50{
51 unsigned long flags;
52
53 __set_current_state(TASK_RUNNING);
54
67 if (!list_empty_careful(&wait->task_list)) {
68 spin_lock_irqsave(&q->lock, flags);
69 list_del_init(&wait->task_list);
70 spin_unlock_irqrestore(&q->lock, flags);
71 }
72}
概括來講,prepare_to_wait 把自己加入等待隊列,finish_wait 則把自己從隊列裡移除。但由於 prepare_to_wait 可能會被調用多次,如果判斷 wait 已經處於某個隊列中,則不會重複添加。
條件、條件隊列和鎖對於像我一樣平時使用 Java 比較多的讀者,對下面這一段代碼一定不會覺得陌生:
1synchronized (this) {
2 while (!condition) {
3 wait();
4 }
5
6}
這裡我們不禁要問,應用層的代碼可以這樣簡潔,為什麼內核的就不行?這裡我們先來大概了解一下條件隊列,然後再回答這個問題。
所謂的條件隊列/等待隊列,一般由 3 個成分組成:
一個隊列,用於存放等待條件/事件的線程。在應用層,一般我們叫他條件隊列(condition queue),LINUX 內核叫他 wait queue
一個鎖,用於保護這個隊列
一個謂詞(它的計算結果為 bool 值)用作條件,即前面示例代碼的 condition。
Java 程式設計師們在這裡需要特別注意的是,我說的鎖的作用是保護條件隊列。回顧我們常寫的 Java 代碼,一般這個鎖也用來保護謂詞,但這個不是必須的。Java 要求我們在調用 wait 的時候必須持有鎖的原因之一是,wait 的內部會把當前線程加入條件隊列;修改列表必須持有鎖(另一個原因是,wait 的語義之一便是執行後會釋放鎖,如果都不持有,何來的釋放呢)。
但在另一面,喚醒條件隊列上的線程卻不一定需要持有鎖,雖然 Java 要求我們必須持有鎖才能調用 notify。持有鎖調用 notify 的好處在於,notify 後條件不會改變。同時,如果持有鎖的話,這個操作裡也可以把相關線程從條件隊列裡刪除。不好的地方在於,調用 notify 的線程在執行喚醒操作的時候還持有鎖,被喚醒線程這個時候如果被內核調度,他的獲取鎖的操作將失敗(會導致該線程又進入睡眠狀態)。這種實現方式性能上可能差一點,但代碼更安全。
不要求調用 notify 時持有鎖的一個例子是 pthread。這種方式的問題在於,在 notify 還沒執行完的時候,條件可能就發生了變化。可能的實現是,只設置線程為可執行狀態,等線程獲得鎖後自己把自己從隊列裡面移除。
了解了相關的數據結構後,不難猜想 Java 裡 wait 的實現。考慮一種應用層 wait 的實現如下:
1void wait() {
2 add_to_condition_queue();
3 unlock();
4 schedule();
5}
把 wait 方法做一下內聯(inlining)處理,可以得到:
1lock();
2while (!condition) {
3 add_to_condition_queue();
4 unlock();
5 schedule();
6}
7
對比一下內核的 wait_event:
1void our_wait_event() {
2 if (condition) return;
3
4 for (;;) {
5
6 add_to_wait_queue_if_not_added_yet();
7 if (condition)
8 break;
9 schedule();
10 }
11 remove_from_wait_queue();
12}
可以看到,內核把代碼寫得更複雜的好處在於,它在切換進程前可以再檢查一次條件,如果條件已經滿足,就不需要執行 schedule 了。切換進程需要保存當前進程的上下文,同時會導致 TLB、Cache 等一系列緩存時效,因此內核總是儘量避免不必要的線程切換,而代價就是更複雜的代碼。
double-check首先,如果你也和我一樣覺得 our_wait_event 裡面兩個 if 有點難看,我們不妨試著來給他改一改:
1void our_wait_event2() {
2 while (!condition) {
3 add_to_wait_queue_if_not_added_yet();
4 schedule();
5 }
6 remove_from_wait_queue();
7}
咋一看好像沒什麼問題,都是一樣的檢測條件,在條件不滿足的情況下加入等待隊列,調用 schedule。重要的是,上面這段代碼更簡潔,更易讀。那麼,他正確嗎?
不消說,肯定是有問題的,不然那班內核程式設計師不會不知道該這麼寫。那問題究竟出在哪裡呢?
考慮下面兩個執行流:
1thread1 thread2
2
3check condition => false
4add_to_wait_queue()
5
6 alter_condition()
7 notify_all()
8
9state = TASK_UNINTERRUPTABLE
10schedule()
thread1 在把自己加入等待隊列後,schedule 前,thread2 就更改了條件並且調用 notify。在這種情況下,如果沒有其他線程再次調用 notify,thread1 將會永遠休眠(而 thread2 認為自己已經 noitfy 過 thread1 了)。
為了防止發生這種情況,在添加到等待隊列後,thread1 還應該再檢查一次條件,如果條件滿足,直接把自己從隊列裡移除就可以了。
為了方便讀者把 wait_event 和 double-check 聯繫起來,下面我們看一段使用 double-check 實現的 Java 的單例的例子:
1public static SomeClass getInstance() {
2 if (sInstance == null) {
3 synchronized (SomeClass.class) {
4 if (sInstance != null) {
5 sInstance = new SomeClass();
6 }
7 }
8 }
9 return sInstance;
10}
兩者的共同點都是先檢測一遍條件是否成立,然後設置一個「安全閥」。在持有這個安全閥時,再一次檢測條件是否滿足。double-check 的多線程安全性都源於這個安全閥。就 wait_event 來說,當我們把自己加入等待隊列後,就可以保證不會丟失另一個線程的 notify。而創建單例時,加鎖保證了第二次判斷後不會有另一個線程同時創建對象。(可能說得有點抽象,如果讀者不明白,直接跳過就好。只要讀者能夠完成下面的小測驗,那就是懂得 double-check 的)。
double-check 小測驗假設有這樣一個方法,他可以用來下載文件:
1interface DownloadCallback {
2 void onSuccess(File file);
3}
4
5public void download(String url, DownloadCallback callback) {
6
7}
我們又假設,可能同時有多個客戶會調用這個接口下載同一個文件。為了避免同時下載同一個文件,我們可以在下載時判斷一下當前是否已經有任務在下載:
1interface DownloadCallback {
2 void onSuccess(File file);
3}
4
5private class DownloadTask {
6
7 private final List<DownloadCallback> mCallbacks = new ArrayList<>();
8
9 public DownloadTask(String url, DownloadCallback callback) {
10 mCallbacks.add(callback);
11 }
12
13 public void download() {
14
15 File file = new File("downloaded-file");
16
17
18 List<DownloadCallback> copy;
19 synchronized (mCallbacks) {
20 copy = new ArrayList<>(mCallbacks);
21 mCallbacks.clear();
22 }
23 for (DownloadCallback callback : copy) {
24 callback.onSuccess(file);
25 }
26 }
27
28 public void addCallback(DownloadCallback callback) {
29 synchronized (mCallbacks) {
30 mCallbacks.add(callback);
31 }
32 }
33}
34
35
36private final ConcurrentMap<String, DownloadTask> mTasks = new ConcurrentHashMap<>();
37
38public void download(String url, DownloadCallback callback) {
39 File file = new File(getDestFilePath(url));
40 if (file.exists()) {
41
42 callback.onSuccess(file);
43 return;
44 }
45 DownloadTask task = new DownloadTask(url, callback);
46 DownloadTask downloadingTask = mTasks.putIfAbsent(url, task);
47 if (downloadingTask == null) {
48
49 task.download();
50 return;
51 }
52
53 downloadingTask.addCallback(callback);
54}
55
56private String getDestFilePath(String url) {
57 return "url-to-file-path...";
58}
為了檢驗讀者是否真正理解 wait_event,你可以嘗試著解決上面代碼裡存在的競爭條件。如果一時沒能發現其中的問題,建議讀者再從頭讀一遍文章。為了鼓勵讀者獨立思考、與他人交流,這裡我就順勢偷個懶不公布答案了。畢竟,在實際工作中,可不是總會有人告訴你你的代碼寫得是否正確。
內存屏障一瞥所謂的內存屏障,主要分為 3 種:
read memory barrier(rmb),保證屏障前的讀發生在屏障後的讀操作之前
write memory barrier(wmb),保證屏障前的寫發生在屏障後的寫操作之前
full memory barrier(mb),保證屏障前的讀寫操作發生在屏障後的讀寫操作之前
前面 prepare_to_wait 有這麼一段注釋:
1
這段注釋一開始我也是看得雲裡霧裡,直到我找到了他們解決一個內核 bug 的郵件(Google 大法好)。
這裡面的 tests for the wait-queue being active 可以根據 waitqueue_active 來理解,其實指的就是等待隊列不為空。spin-lock 雖然可以防止數據競爭,但如果別人在檢查的時候不去獲取鎖呢?(waitqueue_active 就沒有加鎖)。當然,不加鎖可以獲得更好的性能。
j
1static inline int waitqueue_active(wait_queue_head_t *q)
2{
3 return !list_empty(&q->task_list);
4}
set_current_state 使用 set_mb 來設置當前進程的狀態。
1
12#define __set_current_state(state_value) \
13 do { current->state = (state_value); } while (0)
14#define set_current_state(state_value) \
15 set_mb(current->state, (state_value))
下面是文檔對 set_mb 的描述:
1
set_current_state 在設置當前進程的狀態後,會插入一個 mb。前面我們了解到,這將禁止 CPU 將 set_current_state 後面的 load/store 提前。
為了理解完全理解這裡 set_mb 的使用,我們還需要再參考一下 wake_up 函數。但由於篇幅關係,這裡我只是簡單介紹它的實現:
1for_each_process_in_wait_queue_without_lock:
2 if process.state is sleeping:
3 wake it up
4 remove it from wait-queue
假設在 prepare_to_wait 裡面我使用的是平凡的 __set_current_state,那麼 CPU 就可以把 prepare_to_wait 函數返回後我們所執行的對條件的判斷提前到設置進程狀態前。這種情況下,如果發生以下的執行序列,CPU2 將會丟失一個 wake-up,他有可能會永遠休眠。
1CPU0 CPU1
2- ---
3 check_condition() => false
4condition = true
5wake_up()
6 __set_current_state()
7 schedule()
解決辦法就是引入一個 mb。在下面的例子中,如果 __set_current_state() 在 wake_up() 後執行,CPU1 上的這個線程將不會被喚醒,但隨後的 check_condition() 會正確返回 true。反過來,如果 __set_current_state() 在 wake_up() 前執行,check_condition() 可能返回 true 也可能返回 false,但無論如何,他都不會丟失隨後的 wake_up()。
1CPU0 CPU1
2- ---
3condition = true
4wake_up()
5 __set_current_state()
6 mb();
7 check_condition() => true
還記得嗎,set_current_state 就是在設置進程狀態後插入一個內存屏障,所以 prepare_to_wait 直接使用 set_current_state 就可以了。
現在,我們終於可以說自己完全理解 wait_event 了。也許讀者是第一次接觸內存屏障,但我敢保證,很多 Java 程式設計師在不知不覺中使用過一定形式上的屏障。下面我們看一個例子:
1private int mSomeData;
2private int mSomeOtherData;
3private volatile boolean mDataSet;
4
5public void foo() {
6 if (mDataSet) {
7
8 }
9}
10
11public void bar() {
12 mSomeData = 1;
13 mSomeOtherData = 2;
14 mDataSet = true;
15}
有一定開發經驗的讀者很可能看過類似的代碼。雖然我們沒有在 mSomeData 和 mSomeOtherData 的讀寫上做顯式的同步,但只要仔細編寫代碼,利用一個 volatile 變量 mDataSet,這段代碼也可以是線程安全的。
為了避免引入內存屏障這個比較複雜的概念(並且提供更好的移植性),Java 使用一個 happens-before 來描述相關的概念。關於 volatile 有這麼一條描述:
A write to a volatile field happens-before every subsequent read of that field.
另外,對於同一個線程,有:
If x and y are actions of the same thread and x comes before y in program order, then x happen before y.
同時,happens-before 具有傳遞性(x -> y, y -> z, x -> z),所以就有了下面這個結論:
對 mSomeData, mSomeOtherData 的寫操作在 mDataSet = true 之前;mDataSet = true 在隨後另一個線程對他的讀操作之前;所以 mSomeData, mSomeOtherData 的寫操作在隨後對 mDataSet 的讀操作之前。
直白一點說,只要一個線程看到 mDataSet 為 true,那他就一定能夠正確讀取到 mSomeData, mSomeOtherData 的值。
如果顯式使用內存屏障,上面的代碼就相當於:
1private int mSomeData;
2private int mSomeOtherData;
3private boolean mDataSet;
4
5public void foo() {
6 if (mDataSet) {
7
8
9 rmb();
10
11 }
12}
13
14public void bar() {
15 mSomeData = 1;
16 mSomeOtherData = 2;
17
18 wmb();
19 mDataSet = true;
20}
最後,牆裂推薦一本並行編程的神書《Is Parallel Programming Hard, And, If So, What Can You Do About It?》(可以免費獲取),書裡有關於內存屏障的最好的描述。
原文完。
歡迎各位踴躍投稿!
點讚轉發分享走一波,爭做優秀的船員!