詳解 C++ 多線程的condition_variable

2021-03-02 CPP開發者

一、condition_variable條件變量的介紹

std::condition_variable 是條件變量,更多有關條件變量的定義參考維基百科。Linux 下使用 Pthread 庫中的 pthread_cond_*() 函數提供了與條件變量相關的功能, Windows 則參考 MSDN  。

當 std::condition_variable 對象的某個 wait 函數被調用的時候,它使用 std::unique_lock(通過 std::mutex) 來鎖住當前線程。當前線程會一直被阻塞,直到另外一個線程在相同的 std::condition_variable 對象上調用了 notification 函數來喚醒當前線程。

std::condition_variable對象通常使用std::unique_lock<std::mutex>來等待,如果需要使用另外的lockable類型,可以使用 std::condition_variable_any 類,本文後面會講到 std::condition_variable_any 的用法。

#include <iostream>                // std::cout
#include <thread>                // std::thread
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable

std::mutex mtx; // 全局互斥鎖.
std::condition_variable cv; // 全局條件變量.
bool ready = false; // 全局標誌位.

void do_print_id(int id)
{
 std::unique_lock <std::mutex> lck(mtx);
 while (!ready) // 如果標誌位不為 true, 則等待...
  cv.wait(lck); // 當前線程被阻塞, 當全局標誌位變為 true 之後,
 // 線程被喚醒, 繼續往下執行列印線程編號id.
 std::cout << "thread " << id << '\n';
}

void go()
{
 std::unique_lock <std::mutex> lck(mtx);
 ready = true; // 設置全局標誌位為 true.
 cv.notify_all(); // 喚醒所有線程.
}

int main()
{
 std::thread threads[10];
 // spawn 10 threads:
 for (int i = 0; i < 10; ++i)
  threads[i] = std::thread(do_print_id, i);

 std::cout << "10 threads ready to race...\n";
 go(); // go!

 for (auto & th : threads)
  th.join();

 return 0;
}

好了,對條件變量有了一個基本的了解之後,我們來看看 std::condition_variable 的各個成員函數。

二、std::condition_variable 構造函數

default (1)

condition_variable();

copy [deleted] (2)

condition_variable (const condition_variable&) = delete;

std::condition_variable 的拷貝構造函數被禁用,只提供了默認構造函數。

三、std::condition_variable::wait() 介紹

unconditional (1)

void wait (unique_lock<mutex>& lck);

predicate (2)

template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);

std::condition_variable 提供了兩種 wait() 函數。當前線程調用 wait() 後將被阻塞(此時當前線程應該獲得了鎖(mutex),不妨設獲得鎖 lck),直到另外某個線程調用 notify_* 喚醒了當前線程。

在線程被阻塞時,該函數會自動調用 lck.unlock() 釋放鎖,使得其他被阻塞在鎖競爭上的線程得以繼續執行。另外,一旦當前線程獲得通知(notified,通常是另外某個線程調用 notify_* 喚醒了當前線程),wait() 函數也是自動調用 lck.lock(),使得 lck 的狀態和 wait 函數被調用時相同。

在第二種情況下(即設置了 Predicate),只有當 pred 條件為 false 時調用 wait() 才會阻塞當前線程,並且在收到其他線程的通知後只有當 pred 為 true 時才會被解除阻塞。因此第二種情況類似以下代碼:

while (!pred()) wait(lck);

#include <iostream>                // std::cout
#include <thread>                // std::thread, std::this_thread::yield
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable

std::mutex mtx;
std::condition_variable cv;

int cargo = 0;
bool shipment_available()
{
 return cargo != 0;
}

// 消費者線程.
void consume(int n)
{
 for (int i = 0; i < n; ++i) 
 {
  std::unique_lock <std::mutex> lck(mtx);
  cv.wait(lck, shipment_available);
  std::cout << cargo << '\n';
  cargo = 0;
 }
}

int main()
{
 std::thread consumer_thread(consume, 10); // 消費者線程.

 // 主線程為生產者線程, 生產 10 個物品.
 for (int i = 0; i < 10; ++i) 
 {
  while (shipment_available())
  {
   std::this_thread::yield();
  }
  std::unique_lock <std::mutex> lck(mtx);
  cargo = i + 1;
  cv.notify_one();
 }

 consumer_thread.join();

 return 0;
}

四、std::condition_variable::wait_for() 介紹

unconditional (1)

template <class Rep, class Period>
cv_status wait_for (unique_lock<mutex>& lck,
const chrono::duration<Rep,Period>& rel_time);

predicate (2)

template <class Rep, class Period, class Predicate>
bool wait_for (unique_lock<mutex>& lck,
const chrono::duration<Rep,Period>& rel_time, Predicate pred);

std::condition_variable::wait() 類似,不過 wait_for 可以指定一個時間段,在當前線程收到通知或者指定的時間 rel_time 超時之前,該線程都會處於阻塞狀態。而一旦超時或者收到了其他線程的通知,wait_for 返回,剩下的處理步驟和 wait() 類似。

另外,wait_for 的重載版本(predicte(2))的最後一個參數 pred 表示 wait_for 的預測條件,只有當 pred 條件為 false 時調用 wait() 才會阻塞當前線程,並且在收到其他線程的通知後只有當 pred 為 true 時才會被解除阻塞,因此相當於如下代碼:

return wait_until (lck, chrono::steady_clock::now() + rel_time, std::move(pred));

請看下面的例子(參考),下面的例子中,主線程等待 th 線程輸入一個值,然後將 th 線程從終端接收的值列印出來,在 th 線程接受到值之前,主線程一直等待,每個一秒超時一次,並列印一個 ".":

#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <chrono>             // std::chrono::seconds
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status

std::condition_variable cv;

int value;

void do_read_value()
{
 std::cin >> value;
 cv.notify_one();
}

int main()
{
 std::cout << "Please, enter an integer (I'll be printing dots): \n";
 std::thread th(do_read_value);

 std::mutex mtx;
 std::unique_lock<std::mutex> lck(mtx);
 while (cv.wait_for(lck, std::chrono::seconds(1)) == std::cv_status::timeout) 
 {
  std::cout << '.';
  std::cout.flush();
 }

 std::cout << "You entered: " << value << '\n';

 th.join();
 return 0;
}

五、std::condition_variable::wait_until 介紹

unconditional (1)

template <class Clock, class Duration>
cv_status wait_until (unique_lock<mutex>& lck,
const chrono::time_point<Clock,Duration>& abs_time);

predicate (2)

template <class Clock, class Duration, class Predicate>
bool wait_until (unique_lock<mutex>& lck,
const chrono::time_point<Clock,Duration>& abs_time,
Predicate pred);

std::condition_variable::wait_for 類似,但是 wait_until 可以指定一個時間點,在當前線程收到通知或者指定的時間點 abs_time 超時之前,該線程都會處於阻塞狀態。而一旦超時或者收到了其他線程的通知,wait_until 返回,剩下的處理步驟和 wait_until() 類似。

另外,wait_until 的重載版本(predicte(2))的最後一個參數 pred 表示 wait_until 的預測條件,只有當 pred 條件為 false 時調用 wait() 才會阻塞當前線程,並且在收到其他線程的通知後只有當 pred 為 true 時才會被解除阻塞,因此相當於如下代碼:

while (!pred()) if ( wait_until(lck,abs_time) == cv_status::timeout) return pred(); return true;

六、std::condition_variable::notify_one() 介紹

喚醒某個等待(wait)線程。如果當前沒有等待線程,則該函數什麼也不做,如果同時存在多個等待線程,則喚醒某個線程是不確定的(unspecified)。

請看下例(參考):

#include <iostream>                // std::cout
#include <thread>                // std::thread
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable

std::mutex mtx;
std::condition_variable cv;

int cargo = 0; // shared value by producers and consumers

void consumer()
{
 std::unique_lock < std::mutex > lck(mtx);
 while (cargo == 0)
  cv.wait(lck);
 std::cout << cargo << '\n';
 cargo = 0;
}

void producer(int id)
{
 std::unique_lock < std::mutex > lck(mtx);
 cargo = id;
 cv.notify_one();
}

int main()
{
 std::thread consumers[10], producers[10];

 // spawn 10 consumers and 10 producers:
 for (int i = 0; i < 10; ++i) {
  consumers[i] = std::thread(consumer);
  producers[i] = std::thread(producer, i + 1);
 }

 // join them back:
 for (int i = 0; i < 10; ++i) {
  producers[i].join();
  consumers[i].join();
 }

 return 0;
}

std::condition_variable::notify_all() 介紹

喚醒所有的等待(wait)線程。如果當前沒有等待線程,則該函數什麼也不做。請看下面的例子:

#include <iostream>                // std::cout
#include <thread>                // std::thread
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable

std::mutex mtx; // 全局互斥鎖.
std::condition_variable cv; // 全局條件變量.
bool ready = false; // 全局標誌位.

void do_print_id(int id)
{
 std::unique_lock <std::mutex> lck(mtx);
 while (!ready) // 如果標誌位不為 true, 則等待...
  cv.wait(lck); // 當前線程被阻塞, 當全局標誌位變為 true 之後,
 // 線程被喚醒, 繼續往下執行列印線程編號id.
 std::cout << "thread " << id << '\n';
}

void go()
{
 std::unique_lock <std::mutex> lck(mtx);
 ready = true; // 設置全局標誌位為 true.
 cv.notify_all(); // 喚醒所有線程.
}

int main()
{
 std::thread threads[10];
 // spawn 10 threads:
 for (int i = 0; i < 10; ++i)
  threads[i] = std::thread(do_print_id, i);

 std::cout << "10 threads ready to race...\n";
 go(); // go!

 for (auto & th : threads)
  th.join();

 return 0;
}

std::condition_variable_any 介紹

與 std::condition_variable 類似,只不過 std::condition_variable_any 的 wait 函數可以接受任何 lockable 參數,而 std::condition_variable 只能接受 std::unique_lock<std::mutex> 類型的參數,除此以外,和 std::condition_variable 幾乎完全一樣。

std::cv_status 枚舉類型介紹

cv_status::no_timeoutwait_for 或者 wait_until 沒有超時,即在規定的時間段內線程收到了通知。cv_status::timeoutwait_for 或者 wait_until 超時。

std::notify_all_at_thread_exit

函數原型為:

void notify_all_at_thread_exit (condition_variable& cond, unique_lock<mutex> lck);

#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id(int id) {
 std::unique_lock<std::mutex> lck(mtx);
 while (!ready) cv.wait(lck);
 // ...
 std::cout << "thread " << id << '\n';
}

void go() {
 std::unique_lock<std::mutex> lck(mtx);
 std::notify_all_at_thread_exit(cv, std::move(lck));
 ready = true;
}

int main()
{
 std::thread threads[10];
 // spawn 10 threads:
 for (int i = 0; i<10; ++i)
  threads[i] = std::thread(print_id, i);
 std::cout << "10 threads ready to race...\n";

 std::thread(go).detach();   // go!

 for (auto& th : threads) th.join();

 return 0;
}

好了,到此為止,<condition_variable> 頭文件中的兩個條件變量類(std::condition_variable 和 std::condition_variable_any)、枚舉類型(std::cv_status)、以及輔助函數(std::notify_all_at_thread_exit())都已經介紹完了。

相關焦點

  • C++並發condition_variable
    condition_variable 類是同步原語,能用於阻塞一個線程,或同時阻塞多個線程,直至另一線程修改共享變量(條件)並通知 condition_variable
  • 走進C++11(三十)標準化條件變量 -- condition_variable
    std::condition_variable是條件變。Linux下使用 Pthread庫中的 pthread_cond_*() 函數提供了與條件變量相關的功能。和pthread_cond_*()一樣,我們可以使用條件變量(condition_variable)實現多個線程間的同步操作;當條件不滿足時,相關線程被一直阻塞,直到某種條件出現,這些線程才會被喚醒。
  • UNIX(多線程):16---條件變量
    當 std::condition_variable 對象的某個 wait 函數被調用的時候,它使用 std::unique_lock(封裝 std::mutex) 來鎖住當前線程。當前線程會一直被阻塞,直到另外一個線程在相同的 std::condition_variable 對象上調用了 notification 函數來喚醒當前線程。
  • C++ 條件變量使用詳解
    condition_variable介紹在C++11中,我們可以使用條件變量(condition_variable)實現多個線程間的同步操作;當條件不滿足時,相關線程被一直阻塞,直到某種條件出現,這些線程才會被喚醒。
  • C++11線程、鎖和條件變量
    使用條件變量可以將一個或多個線程進入阻塞狀態,直到收到另外一個線程的通知,或者超時或者發生了虛假喚醒,才能退出阻塞狀態。頭文件<condition_variable>中包含的條件變量有兩種實現:下面說說條件變量的工作原理: 必須至少要有一個等待條件變為true的線程。
  • C++並發與多線程__C++如何線程創建線程以及函數join()和detach()用法和區別
    前言:通常一個程序運行起來,也就等於一個進程在運行,這個進程中會有一個主線程自動創建並運行,當程序的main()函數返回之後那麼此主線程也就運行結束
  • C++11實現的100行線程池
    我來講講人話:你的函數需要在多線程中運行,但是你又不能每來一個函數就開啟一個線程,所以你就需要固定的N個線程來跑執行,但是有的線程還沒有執行完,有的又在空閒,如何分配任務呢,你就需要封裝一個線程池來完成這些操作,有了線程池這層封裝,你就只需要告訴它開啟幾個線程,然後直接塞任務就行了,然後通過一定的機制獲取執行結果。
  • Java並發編程系列21|Condition-Lock的等待通知
    waiter 線程獲取到鎖,檢查 flag=false 不滿足條件,執行 condition.await()方法將線程阻塞等待並釋放鎖。signaler 線程獲取到鎖之後更改條件,將 flag 變為 true,執行 condition.signalAll()通知喚醒等待線程,釋放鎖。
  • UNIX(多線程):07---線程啟動、結束,創建線程多法、join,detach
    線程啟動、結束,創建線程多法、join,detach範例演示線程運行的開始和結束#include
  • UNIX(多線程):14---理解線程構造函數
    第一參數的類型並不是c語言中的函數指針(c語言傳遞函數都是使用函數指針),在c++11中,增加了可調用對象(Callable Objects)的概念,總的來說,可調用對象可以是以下幾種情況:函數指針示例void function_1() {}void function_2(int i) {}void
  • Linux 多線程詳解 —— 什麼是線程
    線程是怎樣描述的?線程實際上也是一個task_struct,工作線程拷貝主線程的task_struct,然後共用主線程的mm_struct。線程ID是在用task_struct中pid描述的,而task_struct中tgid是線程組ID,表示線程屬於該線程組,對於主線程而言,其pid和tgid是相同的,我們一般看到的進程ID就是tgid。
  • C++多線程編程之創建線程的幾種方法
    << endl;    return 0;}    觀察上面的代碼發現:有兩個線程在運行,相當於整個程序的執行有兩條線在同時走。所以可以同時幹兩件事。即使一條線被堵住了,另外一條線仍然是可以通行的,這就是多線程。
  • 能不能推薦幾本 C++ 的書?
    另外,時至一個進程如何裝在各個 so 或 dll 文件的,這些文件被加載到進程地址空間的什麼位置,如何被執行,數據如何被交換。第三個基礎知識是狹義的作業系統原理。這裡加上「狹義」二字是因為從廣義上來講,以上所說的內容都是作業系統原理的範疇。狹義的作業系統原理這裡包括作業系統如何管理進程與線程,虛擬內存與物理內存之間的對應關係,何為內存映射文件,進程之間如何通信等等。
  • Linux 多線程詳解 —— 線程安全
    線程安全中涉及到的概念:臨界資源:多線程中都能訪問到的資源臨界區:每個線程內部,訪問臨界資源的代碼,就叫臨界區
  • Python中的Condition條件變量 - Python每日3題(多線程專題)
    這裡是Python7編程挑戰-多線程專題
  • C++11並發編程:多線程std::thread
    一:概述C++11引入了thread類,大大降低了多線程使用的複雜度,原先使用多線程只能用系統的API,無法解決跨平臺問題,一套代碼平臺移植,對應多線程代碼也必須要修改。現在在C++11中只需使用語言層面的thread可以解決這個問題。
  • Java多線程通信方式[二]
    加鎖目的synchronziedLock接口的Lock()、unlock()方法使得當前線程處於等待狀態wait()Condition接口提供的await()喚醒當前對象的線程notify()、notifyAll()Condition接口提供的singal()、signalAll()方法接下來使用Lock和Condition重構之前的第一個生產者和消費者代碼。pack
  • python3多線程實現一個語音報時,定時關機功能的應用
    python的語法類似與c++,但是比c++更靈活一些。如果您學習過c++,那肯定很快就入門python了。但是入門簡單不代表深入還簡單哦!任何一款程式語言,您學習後用到項目中,就會發現項目中遇到的問題,往往不是程式語言本身而是行業技術的限制或通病。例如您覺得python上手很簡單,但讓你用python去做一個人工智慧應用出來。您肯定犯難!為什麼呢?
  • Variable Expenses(可變開銷)
    After you have listed your fixed expenses you will want to determine the amount that you spend on variable
  • Java線程同步-synchronize
    線程幹擾請看一個簡單的稱為Count類public class Counter {private int c = 0;public void increment() {    c++;}public void decrement() {    c--;}public