多線程的數據訪問一直很讓程式設計師們頭大,有什麼簡便方法來實現多線程間的通呢?試試C++11封裝的原子數據類型(std::atomic)吧。
簡單地說,原子數據類型能保證線程之間不會發生數據競爭(data race),因此能保證線程安全(thread safe)。
對於我們用戶來說,就不需要對需要多線程訪問的數據添加互斥鎖。從不同線程訪問某個原子對象是良性(well-defined) 行為,而通常對於非原子類型而言,並發訪問某個對象(如果不做任何同步操作)會導致未定義(undefined) 行為發生。因此,從實現的原理上來,可以理解為原子類型在自己內部添加了互斥鎖。
我們先創建一個CMake工程,並創建兩個線程來操作同一個全局變量,然後來編譯運行,查看它的結果。接下來,我們會針對這個結果進行改造,使之達到我們想要的「線程安全」的要求。測試環境工程1:不使用原子數據類型第一個工程,演示的是如果不使用原子數據類型,程序會產生一定的錯誤。工程代碼可在以上GitHub連結中找到,工程文件名為no-atomic.cpp,內容如下:
#include <atomic>
#include <ctime>
#include <iostream>
#include <thread>
#include <unistd.h>
int data( 0 );
void threadFunc() {
for ( int i = 0; i < 2000; i++ ) {
usleep( 1 );
data++;
}
}
int main( int argc, char* argv[] ) {
std::thread th1( threadFunc );
std::thread th2( threadFunc );
th1.join();
th2.join();
usleep( 20000 );
std::cout << "data = " << data << std::endl;
return 0;
}
這裡的data是一個全局變量(當然實際工程當中使用全局變量不是一個好的習慣,我們這裡只是演示),初始值為0。
在main()函數裡,我們創建了兩個調用threadFunc()的線程:th1()和th2()。
threadFunc()函數很簡單,就是執行data++兩千次。
所以,我們這個小工程,期望兩個線程對data操作之後,它最終的結果是4000。但是,實際情況是這樣的嗎?我們來編譯(會編譯工程裡的所有文件)並運行:
cd <path/to/article-003-atomic/
mkdir build
cd build
cmake ..
make
./no-atomic
我們得到的結果是:
$ ./no-atomic
summation = 3911
而且,多次運行,它的結果是不一樣的!這就說明了, 這個工程的兩個線程在訪問全局變量的時候,產生了一定的錯誤。簡單的說,它們之間發生了數據競爭的情況,導致讀寫出錯。下面,我們針對這一個工程,做一個小小的改造。
工程2:原子操作-簡單用法我們把上面程序裡的全局變量data的定義做一下修改,即把:
int data( 0 );
修改成:
std::atomic< int > data( 0 );
最終的文件,請查看第二個工程文件atomic-simple.cpp。簡單來說,就是data的數據類型不是int型,而是std::atomic<int>型。
我們編譯之後再運行:
$ ./atomic-simple
data = 4000
這一次,它的運行結果是4000了!說明原子數據類型在這裡解決了之前我們遇到的問題。是不是很神奇呢?
關於CMake編譯我們使用CMake來管理前面的兩個工程。以下是前面兩個工程CMakeLists.txt文件當中的相關內容:
#
add_executable(
no-atomic
no-atomic.cpp
)
target_link_libraries(
no-atomic
pthread
)
#
add_executable(
atomic-simple
atomic-simple.cpp
)
target_link_libraries(
atomic-simple
pthread
atomic
)
ldconfig -p | grep libatomic
我們得到的返回是:
libatomic.so.1 (libc6,x32) => /libx32/libatomic.so.1
libatomic.so.1 (libc6,x86-64) => /lib/x86_64-linux-gnu/libatomic.so.1
libatomic.so.1 (libc6) => /lib32/libatomic.so.1
對於類和結構體這樣稍微複雜的數據類型,工程2的簡單方法並不適用。這時,我們需要使用指針,先看代碼(工程文件為atomic-class.cpp):
class Test {
public:
Test( double a, float b, double c ) {
this->a = a;
this->b[0] = b;
this->c.push_back( c );
};
double a;
float b[100];
std::vector< double > c;
};
std::atomic< Test* > msg;
void threadFunc() {
for ( int i = 0; i < 2000; i++ ) {
usleep( 1 );
Test* n = msg.load( std::memory_order_relaxed );
n->a++;
n->c.push_back( i );
msg.store( n, std::memory_order_relaxed );
}
}
int main( int argc, char* argv[] ) {
msg = new Test( 0, 0, 0 );
std::thread th1( threadFunc );
std::thread th2( threadFunc );
th1.join();
th2.join();
Test* n = msg.load( std::memory_order_relaxed );
std::cout << "msg->a: " << n->a << "\n"
<< "msg->b[0]: " << n->b[0] << "\n"
<< "msg->c.size(): " << n->c.size() << "\n";
}
我們運行一下:
$ ./atomic-class
msg->a: 3995
msg->b[0]: 0
msg->c.size(): 3995
這一次,結果似乎又不符合我們的預期呢。是不是std::atomic有問題?其實並不是。真正的原因是因為msg沒有被鎖死:當線程th1()讀取了msg的指針後,會將msg的指針釋放出來,這時線程th2()也可以讀取msg的指針,兩個線程對a同時相加,再分別賦值給了a。std::atomic的原子操作,在這個工程裡面,只保證了msg的指針在多線程的讀寫過程中具有原子性(即讀寫不會出錯),但並沒有保證指針所指向的數據具有原子性。
另外,這個工程,還有可能出現內存錯誤:double free or corruption (!prev) 而不能運行。
這裡,我們需要對數據進行上鎖操作,改造繼續!
工程4:帶mutex的原子操作在工程3的基本上,我們添加了std::mutex類型的數據mutex,關鍵代碼如下(具體代碼可見工程文件atomic-mutex.cpp):
class Test {
public:
...
...
std::mutex mutex;
};
void threadFunc() {
for ( int i = 0; i < 2000; i++ ) {
usleep( 1 );
Test* n = msg.load( std::memory_order_relaxed );
n->mutex.lock();
n->a++;
n->c.push_back( i );
n->mutex.unlock();
msg.store( n, std::memory_order_relaxed );
}
}
即,我們按以下流程來處理數據:
編程不易,總算是出現正確數據了:
$ ./atomic-mutex
msg->a: 4000
msg->b[0]: 0
msg->c.size(): 4001
這幾個小例子大家學會了嘛?下面接著再來看看atomic的常用成員函數吧。
原子操作常用成員函數第四個工程,我們使用了load()和store()成員函數,分別用於讀取和保存被封裝的值。而且,我們指定了內存序(Memory Order)的一種,即std::memory_order_relaxed。內存序還可以為以下的值:
typedef enum memory_order {
memory_order_relaxed, // 不對執行順序做保證
memory_order_acquire, // 本線程中,所有後續的讀操作必須在本條原子操作完成後執行
memory_order_release, // 本線程中,所有之前的寫操作完成後才能執行本條原子操作
memory_order_acq_rel, // 同時包含 memory_order_acquire 和 memory_order_release
memory_order_consume, // 本線程中,所有後續的有關本原子類型的操作,必須在本條原子操作完成之後執行
memory_order_seq_cst // 全部存取都按順序執行
} memory_order;
不同的內存序應該用在什麼樣的條件下,還請小夥伴自己研究吧。本文只能點到為止啦。
原子操作還有另外兩個重要的成員函數:
is_lock_free()用來判斷對象是否具備 lock-free 的特性,如果訪問對象具備無鎖的特性,當多個線程訪問該對象時就不會導致線程阻塞。
exchange() :
T exchange (T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
T exchange (T val, memory_order sync = memory_order_seq_cst) noexcept;
它會將 val 指定的值替換掉之前該原子對象封裝的值,並返回之前該原子對象封裝的值,整個過程是具有原子性的。因此 exchange() 操作也稱為 read-modify-write 操作。
總結std::atomic對於多線程通信來說,大大降低了開發的難度,提高了編程效率,並且簡單易用。多試試吧,它將會給你意外的驚喜。
參考資料C++ 並發編程指南
https://github.com/forhappy/Cplusplus-Concurrency-In-Practice/blob/master/zh/chapter7-Atomic/7.3%20Atomic-tutorial.md
C++ 並發指南-atomic 指針的使用(三)
https://blog.csdn.net/weixin_40490238/article/details/108542060