日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > linux >内容正文

linux

[C++11 std::thread] 使用C++11 编写 Linux 多线程程序

發(fā)布時(shí)間:2023/12/9 linux 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [C++11 std::thread] 使用C++11 编写 Linux 多线程程序 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

From: http://www.ibm.com/developerworks/cn/linux/1412_zhupx_thread/index.html

?

本文講述了如何使用 C++11 編寫 Linux 下的多線程程序,如何使用鎖,以及相關(guān)的注意事項(xiàng),還簡述了 C++11 引入的一些高級(jí)概念如 promise/future 等。

前言

在這個(gè)多核時(shí)代,如何充分利用每個(gè) CPU 內(nèi)核是一個(gè)繞不開的話題,從需要為成千上萬的用戶同時(shí)提供服務(wù)的服務(wù)端應(yīng)用程序,到需要同時(shí)打開十幾個(gè)頁面,每個(gè)頁面都有幾十上百個(gè)鏈接的 web 瀏覽器應(yīng)用程序,從保持著幾 t 甚或幾 p 的數(shù)據(jù)的數(shù)據(jù)庫系統(tǒng),到手機(jī)上的一個(gè)有良好用戶響應(yīng)能力的 app,為了充分利用每個(gè) CPU 內(nèi)核,都會(huì)想到是否可以使用多線程技術(shù)。這里所說的“充分利用”包含了兩個(gè)層面的意思,一個(gè)是使用到所有的內(nèi)核,再一個(gè)是內(nèi)核不空閑,不讓某個(gè)內(nèi)核長時(shí)間處于空閑狀態(tài)。在 C++98 的時(shí)代,C++標(biāo)準(zhǔn)并沒有包含多線程的支持,人們只能直接調(diào)用操作系統(tǒng)提供的 SDK API 來編寫多線程程序,不同的操作系統(tǒng)提供的 SDK API 以及線程控制能力不盡相同,到了 C++11,終于在標(biāo)準(zhǔn)之中加入了正式的多線程的支持,從而我們可以使用標(biāo)準(zhǔn)形式的類來創(chuàng)建與執(zhí)行線程,也使得我們可以使用標(biāo)準(zhǔn)形式的鎖、原子操作、線程本地存儲(chǔ) (TLS) 等來進(jìn)行復(fù)雜的各種模式的多線程編程,而且,C++11 還提供了一些高級(jí)概念,比如 promise/future,packaged_task,async 等以簡化某些模式的多線程編程。

多線程可以讓我們的應(yīng)用程序擁有更加出色的性能,同時(shí),如果沒有用好,多線程又是比較容易出錯(cuò)的且難以查找錯(cuò)誤所在,甚至可以讓人們覺得自己陷進(jìn)了泥潭,希望本文能夠幫助您更好地使用 C++11 來進(jìn)行 Linux 下的多線程編程。

認(rèn)識(shí)多線程

首先我們應(yīng)該正確地認(rèn)識(shí)線程。維基百科對(duì)線程的定義是:線程是一個(gè)編排好的指令序列,這個(gè)指令序列(線程)可以和其它的指令序列(線程)并行執(zhí)行,操作系統(tǒng)調(diào)度器將線程作為最小的 CPU 調(diào)度單元。在進(jìn)行架構(gòu)設(shè)計(jì)時(shí),我們應(yīng)該多從操作系統(tǒng)線程調(diào)度的角度去考慮應(yīng)用程序的線程安排,而不僅僅是代碼。

當(dāng)只有一個(gè) CPU 內(nèi)核可供調(diào)度時(shí),多個(gè)線程的運(yùn)行示意如下:

圖 1、單個(gè) CPU 內(nèi)核上的多個(gè)線程運(yùn)行示意圖

我們可以看到,這時(shí)的多線程本質(zhì)上是單個(gè) CPU 的時(shí)間分片,一個(gè)時(shí)間片運(yùn)行一個(gè)線程的代碼,它可以支持并發(fā)處理,但是不能說是真正的并行計(jì)算。

當(dāng)有多個(gè) CPU 或者多個(gè)內(nèi)核可供調(diào)度時(shí),可以做到真正的并行計(jì)算,多個(gè)線程的運(yùn)行示意如下:

圖 2、雙核 CPU 上的多個(gè)線程運(yùn)行示意圖

從上述兩圖,我們可以直接得到使用多線程的一些常見場(chǎng)景:

  • 進(jìn)程中的某個(gè)線程執(zhí)行了一個(gè)阻塞操作時(shí),其它線程可以依然運(yùn)行,比如,等待用戶輸入或者等待網(wǎng)絡(luò)數(shù)據(jù)包的時(shí)候處理啟動(dòng)后臺(tái)線程處理業(yè)務(wù),或者在一個(gè)游戲引擎中,一個(gè)線程等待用戶的交互動(dòng)作輸入,另外一個(gè)線程在后臺(tái)合成下一幀要畫的圖像或者播放背景音樂等。
  • 將某個(gè)任務(wù)分解為小的可以并行進(jìn)行的子任務(wù),讓這些子任務(wù)在不同的 CPU 或者內(nèi)核上同時(shí)進(jìn)行計(jì)算,然后匯總結(jié)果,比如歸并排序,或者分段查找,這樣子來提高任務(wù)的執(zhí)行速度。

需要注意一點(diǎn),因?yàn)閱蝹€(gè) CPU 內(nèi)核下多個(gè)線程并不是真正的并行,有些問題,比如 CPU 緩存不一致問題,不一定能表現(xiàn)出來,一旦這些代碼被放到了多核或者多 CPU 的環(huán)境運(yùn)行,就很可能會(huì)出現(xiàn)“在開發(fā)測(cè)試環(huán)境一切沒有問題,到了實(shí)施現(xiàn)場(chǎng)就莫名其妙”的情況,所以,在進(jìn)行多線程開發(fā)時(shí),開發(fā)與測(cè)試環(huán)境應(yīng)該是多核或者多 CPU 的,以避免出現(xiàn)這類情況。

C++11 的線程類 std::thread

C++11 的標(biāo)準(zhǔn)類 std::thread 對(duì)線程進(jìn)行了封裝,它的聲明放在頭文件 thread 中,其中聲明了線程類 thread, 線程標(biāo)識(shí)符 id,以及名字空間 this_thread,按照 C++11 規(guī)范,這個(gè)頭文件至少應(yīng)該兼容如下內(nèi)容:

清單 1.例子 thread 頭文件主要內(nèi)容
namespace std{struct thread{// native_handle_type 是連接 thread 類和操作系統(tǒng) SDK API 之間的橋梁。typedef implementation-dependent native_handle_type;native_handle_type native_handle();//struct id{id() noexcept;// 可以由==, < 兩個(gè)運(yùn)算衍生出其它大小關(guān)系運(yùn)算。bool operator==(thread::id x, thread::id y) noexcept;bool operator<(thread::id x, thread::id y) noexcept;template<class charT, class traits>basic_ostream<charT, traits>&operator<<(basic_ostream<charT, traits>&out, thread::id id);// 哈希函數(shù)template <class T> struct hash;template <> struct hash<thread::id>;};id get_id() const noexcept;// 構(gòu)造與析構(gòu)thread() noexcept;template<class F, class… Args> explicit thread(F&f, Args&&… args);~thread();thread(const thread&) = delete;thread(thread&&) noexcept;thread& operator=( const thread&) = delete;thread& operator=(thread&&) noexcept;//void swap(thread&) noexcept;bool joinable() const noexcept;void join();void detach();// 獲取物理線程數(shù)目static unsigned hardware_concurrency() noexcept;}namespace this_thead{thread::id get_id();void yield();template<class Clock, class Duration>void sleep_until(const chrono::time_point<Clock, Duration>& abs_time);template<class Rep, class Period>void sleep_for(const chromo::duration<Rep, Period>& rel_time);} }

和有些語言中定義的線程不同,C++11 所定義的線程是和操作系的線程是一一對(duì)應(yīng)的,也就是說我們生成的線程都是直接接受操作系統(tǒng)的調(diào)度的,通過操作系統(tǒng)的相關(guān)命令(比如 ps -M 命令)是可以看到的,一個(gè)進(jìn)程所能創(chuàng)建的線程數(shù)目以及一個(gè)操作系統(tǒng)所能創(chuàng)建的總的線程數(shù)目等都由運(yùn)行時(shí)操作系統(tǒng)限定。

native_handle_type 是連接 thread 類和操作系統(tǒng) SDK API 之間的橋梁,在 g++(libstdc++) for Linux 里面,native_handle_type 其實(shí)就是 pthread 里面的 pthread_t 類型,當(dāng) thread 類的功能不能滿足我們的要求的時(shí)候(比如改變某個(gè)線程的優(yōu)先級(jí)),可以通過 thread 類實(shí)例的 native_handle() 返回值作為參數(shù)來調(diào)用相關(guān)的 pthread 函數(shù)達(dá)到目的。thread::id 定義了在運(yùn)行時(shí)操作系統(tǒng)內(nèi)唯一能夠標(biāo)識(shí)該線程的標(biāo)識(shí)符,同時(shí)其值還能指示所標(biāo)識(shí)的線程的狀態(tài),其默認(rèn)值 (thread::id()) 表示不存在可控的正在執(zhí)行的線程(即空線程,比如,調(diào)用 thead() 生成的沒有指定入口函數(shù)的線程類實(shí)例),當(dāng)一個(gè)線程類實(shí)例的 get_id() 等于默認(rèn)值的時(shí)候,即 get_id() == thread::id(),表示這個(gè)線程類實(shí)例處于下述狀態(tài)之一:

  • 尚未指定運(yùn)行的任務(wù)
  • 線程運(yùn)行完畢
  • 線程已經(jīng)被轉(zhuǎn)移 (move) 到另外一個(gè)線程類實(shí)例
  • 線程已經(jīng)被分離 (detached)

空線程 id 字符串表示形式依具體實(shí)現(xiàn)而定,有些編譯器為 0x0,有些為一句語義解釋。

有時(shí)候我們需要在線程執(zhí)行代碼里面對(duì)當(dāng)前調(diào)用者線程進(jìn)行操作,針對(duì)這種情況,C++11 里面專門定義了一個(gè)名字空間 this_thread,其中包括 get_id() 函數(shù)可用來獲取當(dāng)前調(diào)用者線程的 id,yield() 函數(shù)可以用來將調(diào)用者線程跳出運(yùn)行狀態(tài),重新交給操作系統(tǒng)進(jìn)行調(diào)度,sleep_until 和 sleep_for 函數(shù)則可以讓調(diào)用者線程休眠若干時(shí)間。get_id() 函數(shù)實(shí)際上是通過調(diào)用 pthread_self() 函數(shù)獲得調(diào)用者線程的標(biāo)識(shí)符,而 yield() 函數(shù)則是通過調(diào)用操作系統(tǒng) API sched_yield() 進(jìn)行調(diào)度切換。

如何創(chuàng)建和結(jié)束一個(gè)線程

和 pthread_create 不同,使用 thread 類創(chuàng)建線程可以使用一個(gè)函數(shù)作為入口,也可以是其它的 Callable 對(duì)象,而且,可以給入口傳入任意個(gè)數(shù)任意類型的參數(shù):

清單 2.例子 thread_run_func_var_args.cc
int funcReturnInt(const char* fmt, ...){va_list ap;va_start(ap, fmt);vprintf( fmt, ap );va_end(ap);return 0xabcd; } void threadRunFunction(void){thread* t = new thread(funcReturnInt, "%d%s\n", 100, "\%");t->join();delete t; } 我們也可以傳入一個(gè) Lambda 表達(dá)式作為入口,比如:
清單 3.例子 thread_run_lambda.cc
void threadRunLambda(void){int a = 100,b = 200;thread* t = new thread( [](int ia, int ib){cout << (ia + ib) << endl;},a,b );t->join();delete t; }

一個(gè)類的成員函數(shù)也可以作為線程入口:

清單 4.例子 thread_run_member_func.cc
struct God{void create(const char* anything){cout << "create " << anything << endl;} }; void threadRunMemberFunction(void){God god;thread* t = new thread( &God::create, god, "the world" );t->join();delete t; }

雖然 thread 類的初始化可以提供這么豐富和方便的形式,其實(shí)現(xiàn)的底層依然是創(chuàng)建一個(gè) pthread 線程并運(yùn)行之,有些實(shí)現(xiàn)甚至是直接調(diào)用 pthread_create 來創(chuàng)建。

創(chuàng)建一個(gè)線程之后,我們還需要考慮一個(gè)問題:該如何處理這個(gè)線程的結(jié)束?一種方式是等待這個(gè)線程結(jié)束,在一個(gè)合適的地方調(diào)用 thread 實(shí)例的 join() 方法,調(diào)用者線程將會(huì)一直等待著目標(biāo)線程的結(jié)束,當(dāng)目標(biāo)線程結(jié)束之后調(diào)用者線程繼續(xù)運(yùn)行;另一個(gè)方式是將這個(gè)線程分離,由其自己結(jié)束,通過調(diào)用 thread 實(shí)例的 detach() 方法將目標(biāo)線程置于分離模式。一個(gè)線程的 join() 方法與 detach() 方法只能調(diào)用一次,不能在調(diào)用了 join() 之后又調(diào)用 detach(),也不能在調(diào)用 detach() 之后又調(diào)用 join(),在調(diào)用了 join() 或者 detach() 之后,該線程的 id 即被置為默認(rèn)值(空線程),表示不能繼續(xù)再對(duì)該線程作修改變化。如果沒有調(diào)用 join() 或者 detach(),那么,在析構(gòu)的時(shí)候,該線程實(shí)例將會(huì)調(diào)用 std::terminate(),這會(huì)導(dǎo)致整個(gè)進(jìn)程退出,所以,如果沒有特別需要,一般都建議在生成子線程后調(diào)用其 join() 方法等待其退出,這樣子最起碼知道這些子線程在什么時(shí)候已經(jīng)確保結(jié)束。

在 C++11 里面沒有提供 kill 掉某個(gè)線程的能力,只能被動(dòng)地等待某個(gè)線程的自然結(jié)束,如果我們要主動(dòng)停止某個(gè)線程的話,可以通過調(diào)用 Linux 操作系統(tǒng)提供的 pthread_kill 函數(shù)給目標(biāo)線程發(fā)送信號(hào)來實(shí)現(xiàn),示例如下:

清單 5.例子 thread_kill.cc
static void on_signal_term(int sig){cout << "on SIGTERM:" << this_thread::get_id() << endl;pthread_exit(NULL); } void threadPosixKill(void){signal(SIGTERM, on_signal_term);thread* t = new thread( [](){while(true){++counter;}});pthread_t tid = t->native_handle();cout << "tid=" << tid << endl;// 確保子線程已經(jīng)在運(yùn)行。this_thread::sleep_for( chrono::seconds(1) );pthread_kill(tid, SIGTERM);t->join();delete t;cout << "thread destroyed." << endl; }

上述例子還可以用來給某個(gè)線程發(fā)送其它信號(hào),具體的 pthread_exit 函數(shù)調(diào)用的約定依賴于具體的操作系統(tǒng)的實(shí)現(xiàn),所以,這個(gè)方法是依賴于具體的操作系統(tǒng)的,而且,因?yàn)樵?C++11 里面沒有這方面的具體約定,用這種方式也是依賴于 C++編譯器的具體實(shí)現(xiàn)的。

線程類 std::thread 的其它方法和特點(diǎn)

thread 類是一個(gè)特殊的類,它不能被拷貝,只能被轉(zhuǎn)移或者互換,這是符合線程的語義的,不要忘記這里所說的線程是直接被操作系統(tǒng)調(diào)度的。線程的轉(zhuǎn)移使用 move 函數(shù),示例如下:

清單 6.例子 thread_move.cc
void threadMove(void){int a = 1;thread t( [](int* pa){for(;;){*pa = (*pa * 33) % 0x7fffffff;if ( ( (*pa) >> 30) & 1) break;}}, &a);thread t2 = move(t); // 改為 t2 = t 將不能編譯。t2.join();cout << "a=" << a << endl; }

在這個(gè)例子中,如果將 t2.join() 改為 t.join() 將會(huì)導(dǎo)致整個(gè)進(jìn)程被結(jié)束,因?yàn)橥浟苏{(diào)用 t2 也就是被轉(zhuǎn)移的線程的 join() 方法,從而導(dǎo)致整個(gè)進(jìn)程被結(jié)束,而 t 則因?yàn)橐呀?jīng)被轉(zhuǎn)移,其 id 已被置空。

線程實(shí)例互換使用 swap 函數(shù),示例如下:

清單 7.例子 thread_swap.cc
void threadSwap(void){int a = 1;thread t( [](int* pa){for(;;){*pa = (*pa * 33) % 0x7fffffff;if ( ( (*pa) >> 30) & 1) break;}}, &a);thread t2;cout << "before swap: t=" << t.get_id() << ", t2=" << t2.get_id() << endl;swap(t, t2);cout << "after swap : t=" << t.get_id() << ", t2=" << t2.get_id() << endl;t2.join();cout << "a=" << a << endl; }

互換和轉(zhuǎn)移很類似,但是互換僅僅進(jìn)行實(shí)例(以 id 作標(biāo)識(shí))的互換,而轉(zhuǎn)移則在進(jìn)行實(shí)例標(biāo)識(shí)的互換之前,還進(jìn)行了轉(zhuǎn)移目的實(shí)例(如下例的t2)的清理,如果 t2 是可聚合的(joinable() 方法返回 true),則調(diào)用 std::terminate(),這會(huì)導(dǎo)致整個(gè)進(jìn)程退出,比如下面這個(gè)例子:

清單 8.例子 thread_move_term.cc
void threadMoveTerm(void){int a = 1;thread t( [](int* pa){for(;;){*pa = (*pa * 33) % 0x7fffffff;if ( ( (*pa) >> 30) & 1) break;}}, &a);thread t2( [](){int i = 0;for(;;)i++;} );t2 = move(t); // 將會(huì)導(dǎo)致 std::terminate()cout << "should not reach here" << endl;t2.join(); }

所以,在進(jìn)行線程實(shí)例轉(zhuǎn)移的時(shí)候,要注意判斷目的實(shí)例的 id 是否為空值(即 id())。

如果我們繼承了 thread 類,則還需要禁止拷貝構(gòu)造函數(shù)、拷貝賦值函數(shù)以及賦值操作符重載函數(shù)等,另外,thread 類的析構(gòu)函數(shù)并不是虛析構(gòu)函數(shù)。示例如下:

清單 9.例子 thread_inherit.cc
class MyThread : public thread{ public:MyThread() noexcept : thread(){};template<typename Callable, typename... Args>explicitMyThread(Callable&& func, Args&&... args) : thread( std::forward<Callable>(func), std::forward<Args>(args)...){}~MyThread() { thread::~thread(); }// disable copy constructorsMyThread( MyThread& ) = delete;MyThread( const MyThread& ) = delete;MyThread& operator=(const MyThread&) = delete; };

因?yàn)?thread 類的析構(gòu)函數(shù)不是虛析構(gòu)函數(shù),在上例中,需要避免出現(xiàn)下面這種情況:

MyThread* tc = new MyThread(...);

...

thread* tp = tc;

...

delete tp;

這種情況會(huì)導(dǎo)致 MyThread 的析構(gòu)函數(shù)沒有被調(diào)用。

線程的調(diào)度

我們可以調(diào)用 this_thread::yield() 將當(dāng)前調(diào)用者線程切換到重新等待調(diào)度,但是不能對(duì)非調(diào)用者線程進(jìn)行調(diào)度切換,也不能讓非調(diào)用者線程休眠(這是操作系統(tǒng)調(diào)度器干的活)。

清單 10.例子 thread_yield.cc
void threadYield(void){unsigned int procs = thread::hardware_concurrency(), // 獲取物理線程數(shù)目i = 0;thread* ta = new thread( [](){struct timeval t1, t2;gettimeofday(&t1, NULL);for(int i = 0, m = 13; i < COUNT; i++, m *= 17){this_thread::yield();}gettimeofday(&t2, NULL);print_time(t1, t2, " with yield");} );thread** tb = new thread*[ procs ];for( i = 0; i < procs; i++){tb[i] = new thread( [](){struct timeval t1, t2;gettimeofday(&t1, NULL);for(int i = 0, m = 13; i < COUNT; i++, m *= 17){do_nothing();}gettimeofday(&t2, NULL);print_time(t1, t2, "without yield");});}ta->join();delete ta;for( i = 0; i < procs; i++){tb[i]->join();delete tb[i];};delete tb; }

ta 線程因?yàn)樾枰?jīng)常切換去重新等待調(diào)度,它運(yùn)行的時(shí)間要比 tb 要多,比如在作者的機(jī)器上運(yùn)行得到如下結(jié)果:

$time ./a.out without yield elapse 0.050199s without yield elapse 0.051042s without yield elapse 0.05139s without yield elapse 0.048782swith yield elapse 1.63366s real 0m1.643s user 0m1.175s sys 0m0.611s

ta 線程即使扣除系統(tǒng)調(diào)用運(yùn)行時(shí)間 0.611s 之后,它的運(yùn)行時(shí)間也遠(yuǎn)大于沒有進(jìn)行切換的線程。

C++11 沒有提供調(diào)整線程的調(diào)度策略或者優(yōu)先級(jí)的能力,如果需要,只能通過調(diào)用相關(guān)的 pthread 函數(shù)來進(jìn)行,需要的時(shí)候,可以通過調(diào)用 thread 類實(shí)例的 native_handle() 方法或者操作系統(tǒng) API pthread_self() 來獲得 pthread 線程 id,作為 pthread 函數(shù)的參數(shù)。

線程間的數(shù)據(jù)交互和數(shù)據(jù)爭用 (Data Racing)

同一個(gè)進(jìn)程內(nèi)的多個(gè)線程之間多是免不了要有數(shù)據(jù)互相來往的,隊(duì)列和共享數(shù)據(jù)是實(shí)現(xiàn)多個(gè)線程之間的數(shù)據(jù)交互的常用方式,封裝好的隊(duì)列使用起來相對(duì)來說不容易出錯(cuò)一些,而共享數(shù)據(jù)則是最基本的也是較容易出錯(cuò)的,因?yàn)樗鼤?huì)產(chǎn)生數(shù)據(jù)爭用的情況,即有超過一個(gè)線程試圖同時(shí)搶占某個(gè)資源,比如對(duì)某塊內(nèi)存進(jìn)行讀寫等,如下例所示:

清單 11.例子 thread_data_race.cc
static void inc(int *p ){for(int i = 0; i < COUNT; i++){(*p)++;} } void threadDataRacing(void){int a = 0;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }

這是簡化了的極端情況,我們可以一眼看出來這是兩個(gè)線程在同時(shí)對(duì)&a 這個(gè)內(nèi)存地址進(jìn)行寫操作,但是在實(shí)際工作中,在代碼的海洋中發(fā)現(xiàn)它并不一定容易。從表面看,兩個(gè)線程執(zhí)行完之后,最后的 a 值應(yīng)該是 COUNT * 2,但是實(shí)際上并非如此,因?yàn)楹唵稳?(*p)++這樣的操作并不是一個(gè)原子動(dòng)作,要解決這個(gè)問題,對(duì)于簡單的基本類型數(shù)據(jù)如字符、整型、指針等,C++提供了原子模版類 atomic,而對(duì)于復(fù)雜的對(duì)象,則提供了最常用的鎖機(jī)制,比如互斥類 mutex,門鎖 lock_guard,唯一鎖 unique_lock,條件變量 condition_variable 等。

現(xiàn)在我們使用原子模版類 atomic 改造上述例子得到預(yù)期結(jié)果:

清單 12.例子 thread_atomic.cc
static void inc(atomic<int> *p ){for(int i = 0; i < COUNT; i++){(*p)++;} } void threadDataRacing(void){atomic<int> a(0) ;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }

我們也可以使用 lock_guard,lock_guard 是一個(gè)范圍鎖,本質(zhì)是 RAII(Resource Acquire Is Initialization),在構(gòu)建的時(shí)候自動(dòng)加鎖,在析構(gòu)的時(shí)候自動(dòng)解鎖,這保證了每一次加鎖都會(huì)得到解鎖。即使是調(diào)用函數(shù)發(fā)生了異常,在清理?xiàng)臅r(shí)候也會(huì)調(diào)用它的析構(gòu)函數(shù)得到解鎖,從而保證每次加鎖都會(huì)解鎖,但是我們不能手工調(diào)用加鎖方法或者解鎖方法來進(jìn)行更加精細(xì)的資源占用管理,使用 lock_guard 示例如下:

清單 13.例子 thread_lock_guard.cc
static mutex g_mutex; static void inc(int *p ){for(int i = 0; i < COUNT; i++){lock_guard<mutex> _(g_mutex);(*p)++;} } void threadLockGuard(void){int a = 0;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }

如果要支持手工加鎖,可以考慮使用 unique_lock 或者直接使用 mutex。unique_lock 也支持 RAII,它也可以一次性將多個(gè)鎖加鎖;如果使用 mutex 則直接調(diào)用 mutex 類的 lock, unlock, trylock 等方法進(jìn)行更加精細(xì)的鎖管理:

清單 14.例子 thread_mutex.cc
static mutex g_mutex; static void inc(int *p ){thread_local int i; // TLS 變量for(; i < COUNT; i++){g_mutex.lock();(*p)++;g_mutex.unlock();} } void threadMutex(void){int a = 0;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }

在上例中,我們還使用了線程本地存儲(chǔ) (TLS) 變量,我們只需要在變量前面聲明它是 thread_local 即可。TLS 變量在線程棧內(nèi)分配,線程棧只有在線程創(chuàng)建之后才生效,在線程退出的時(shí)候銷毀,需要注意不同系統(tǒng)的線程棧的大小是不同的,如果 TLS 變量占用空間比較大,需要注意這個(gè)問題。TLS 變量一般不能跨線程,其初始化在調(diào)用線程第一次使用這個(gè)變量時(shí)進(jìn)行,默認(rèn)初始化為 0。

對(duì)于線程間的事件通知,C++11 提供了條件變量類 condition_variable,可視為 pthread_cond_t 的封裝,使用條件變量可以讓一個(gè)線程等待其它線程的通知 (wait,wait_for,wait_until),也可以給其它線程發(fā)送通知 (notify_one,notify_all),條件變量必須和鎖配合使用,在等待時(shí)因?yàn)橛薪怄i和重新加鎖,所以,在等待時(shí)必須使用可以手工解鎖和加鎖的鎖,比如 unique_lock,而不能使用 lock_guard,示例如下:

清單 15.例子 thread_cond_var.cc
#include <thread> #include <iostream> #include <condition_variable> using namespace std; mutex m; condition_variable cv; void threadCondVar(void){ # define THREAD_COUNT 10thread** t = new thread*[THREAD_COUNT];int i;for(i = 0; i < THREAD_COUNT; i++){t[i] = new thread( [](int index){unique_lock<mutex> lck(m);cv.wait_for(lck, chrono::hours(1000));cout << index << endl;}, i );this_thread::sleep_for( chrono::milliseconds(50));}for(i = 0; i < THREAD_COUNT; i++){lock_guard<mutex> _(m);cv.notify_one();}for(i = 0; i < THREAD_COUNT; i++){t[i]->join();delete t[i];}delete t; }

從上例的運(yùn)行結(jié)果也可以看到,條件變量是不保證次序的,即首先調(diào)用 wait 的不一定首先被喚醒。

幾個(gè)高級(jí)概念

C++11 提供了若干多線程編程的高級(jí)概念:promise/future, packaged_task, async,來簡化多線程編程,尤其是線程之間的數(shù)據(jù)交互比較簡單的情況下,讓我們可以將注意力更多地放在業(yè)務(wù)處理上。

promise/future 可以用來在線程之間進(jìn)行簡單的數(shù)據(jù)交互,而不需要考慮鎖的問題,線程 A 將數(shù)據(jù)保存在一個(gè) promise 變量中,另外一個(gè)線程 B 可以通過這個(gè) promise 變量的 get_future() 獲取其值,當(dāng)線程 A 尚未在 promise 變量中賦值時(shí),線程 B 也可以等待這個(gè) promise 變量的賦值:

清單 16.例子 thread_promise_future.cc
promise<string> val; static void threadPromiseFuture(){thread ta([](){future<string> fu = val.get_future();cout << "waiting promise->future" << endl;cout << fu.get() << endl;});thread tb([](){this_thread::sleep_for( chrono::milliseconds(100) );val.set_value("promise is set");});ta.join();tb.join(); }

一個(gè) future 變量只能調(diào)用一次 get(),如果需要多次調(diào)用 get(),可以使用 shared_future,通過 promise/future 還可以在線程之間傳遞異常。

如果將一個(gè) callable 對(duì)象和一個(gè) promise 組合,那就是 packaged_task,它可以進(jìn)一步簡化操作:

清單 17.例子 thread_packaged_task.cc
static mutex g_mutex; static void threadPackagedTask(){auto run = [=](int index){ {lock_guard<mutex> _(g_mutex);cout << "tasklet " << index << endl;}this_thread::sleep_for( chrono::seconds(10) );return index * 1000;};packaged_task<int(int)> pt1(run);packaged_task<int(int)> pt2(run);thread t1([&](){pt1(2);} );thread t2([&](){pt2(3);} );int f1 = pt1.get_future().get();int f2 = pt2.get_future().get();cout << "task result=" << f1 << endl;cout << "task result=" << f2 << endl;t1.join();t2.join(); }

我們還可以試圖將一個(gè) packaged_task 和一個(gè)線程組合,那就是 async() 函數(shù)。使用 async() 函數(shù)啟動(dòng)執(zhí)行代碼,返回一個(gè) future 對(duì)象來保存代碼返回值,不需要我們顯式地創(chuàng)建和銷毀線程等,而是由 C++11 庫的實(shí)現(xiàn)決定何時(shí)創(chuàng)建和銷毀線程,以及創(chuàng)建幾個(gè)線程等,示例如下:

清單 18.例子 thread_async.cc
static long do_sum(vector<long> *arr, size_t start, size_t count){static mutex _m;long sum = 0;for(size_t i = 0; i < count; i++){sum += (*arr)[start + i];}{lock_guard<mutex> _(_m);cout << "thread " << this_thread::get_id() << ", count=" << count<< ", sum=" << sum << endl;}return sum; } static void threadAsync(){ # define COUNT 1000000vector<long> data(COUNT);for(size_t i = 0; i < COUNT; i++){data[i] = random() & 0xff;}//vector< future<long> > result;size_t ptc = thread::hardware_concurrency() * 2;for(size_t batch = 0; batch < ptc; batch++){size_t batch_each = COUNT / ptc;if (batch == ptc - 1){batch_each = COUNT - (COUNT / ptc * batch);}result.push_back(async(do_sum, &data, batch * batch_each, batch_each));}long total = 0;for(size_t batch = 0; batch < ptc; batch++){total += result[batch].get();}cout << "total=" << total << endl; }

如果是在多核或者多 CPU 的環(huán)境上面運(yùn)行上述例子,仔細(xì)觀察輸出結(jié)果,可能會(huì)發(fā)現(xiàn)有些線程 ID 是重復(fù)的,這說明重復(fù)使用了線程,也就是說,通過使用 async() 還可達(dá)到一些線程池的功能。

幾個(gè)需要注意的地方

thread 同時(shí)也是棉線、毛線、絲線等意思,我想大家都能體會(huì)面對(duì)一團(tuán)亂麻不知從何處查找頭緒的感受,不要忘了,線程不是靜態(tài)的,它是不斷變化的,請(qǐng)想像一下面對(duì)一團(tuán)會(huì)動(dòng)態(tài)變化的亂麻的情景。所以,使用多線程技術(shù)的首要準(zhǔn)則是我們自己要十分清楚我們的線程在哪里?線頭(線程入口和出口)在哪里?先安排好線程的運(yùn)行,注意不同線程的交叉點(diǎn)(訪問或者修改同一個(gè)資源,包括內(nèi)存、I/O 設(shè)備等),盡量減少線程的交叉點(diǎn),要知道幾條線堆在一起最怕的是互相打結(jié)。

當(dāng)我們的確需要不同線程訪問一個(gè)共同的資源時(shí),一般都需要進(jìn)行加鎖保護(hù),否則很可能會(huì)出現(xiàn)數(shù)據(jù)不一致的情況,從而出現(xiàn)各種時(shí)現(xiàn)時(shí)不現(xiàn)的莫名其妙的問題,加鎖保護(hù)時(shí)有幾個(gè)問題需要特別注意:一是一個(gè)線程內(nèi)連續(xù)多次調(diào)用非遞歸鎖 (non-recursive lock) 的加鎖動(dòng)作,這很可能會(huì)導(dǎo)致異常;二是加鎖的粒度;三是出現(xiàn)死鎖 (deadlock),多個(gè)線程互相等待對(duì)方釋放鎖導(dǎo)致這些線程全部處于罷工狀態(tài)。

第一個(gè)問題只要根據(jù)場(chǎng)景調(diào)用合適的鎖即可,當(dāng)我們可能會(huì)在某個(gè)線程內(nèi)重復(fù)調(diào)用某個(gè)鎖的加鎖動(dòng)作時(shí),我們應(yīng)該使用遞歸鎖 (recursive lock),在 C++11 中,可以根據(jù)需要來使用 recursive_mutex,或者 recursive_timed_mutex。

第二個(gè)問題,即鎖的粒度,原則上應(yīng)該是粒度越小越好,那意味著阻塞的時(shí)間越少,效率更高,比如一個(gè)數(shù)據(jù)庫,給一個(gè)數(shù)據(jù)行 (data row) 加鎖當(dāng)然比給一個(gè)表 (table) 加鎖要高效,但是同時(shí)復(fù)雜度也會(huì)越大,越容易出錯(cuò),比如死鎖等。

對(duì)于第三個(gè)問題我們需要先看下出現(xiàn)死鎖的條件:

  • 資源互斥,某個(gè)資源在某一時(shí)刻只能被一個(gè)線程持有 (hold);
  • 吃著碗里的還看著鍋里的,持有一個(gè)以上的互斥資源的線程在等待被其它進(jìn)程持有的互斥資源;
  • 不可搶占,只有在某互斥資源的持有線程釋放了該資源之后,其它線程才能去持有該資源;
  • 環(huán)形等待,有兩個(gè)或者兩個(gè)以上的線程各自持有某些互斥資源,并且各自在等待其它線程所持有的互斥資源。
  • 我們只要不讓上述四個(gè)條件中的任意一個(gè)不成立即可。在設(shè)計(jì)的時(shí)候,非常有必要先分析一下會(huì)否出現(xiàn)滿足四個(gè)條件的情況,特別是檢查有無試圖去同時(shí)保持兩個(gè)或者兩個(gè)以上的鎖,當(dāng)我們發(fā)現(xiàn)試圖去同時(shí)保持兩個(gè)或者兩個(gè)以上的鎖的時(shí)候,就需要特別警惕了。下面我們來看一個(gè)簡化了的死鎖的例子:

    清單 19.例子 thread_deadlock.cc
    static mutex g_mutex1, g_mutex2; static void inc1(int *p ){for(int i = 0; i < COUNT; i++){g_mutex1.lock();(*p)++;g_mutex2.lock();// do something.g_mutex2.unlock();g_mutex1.unlock();} } static void inc2(int *p ){for(int i = 0; i < COUNT; i++){g_mutex2.lock();g_mutex1.lock();(*p)++;g_mutex1.unlock();// do other thing.g_mutex2.unlock();} } void threadMutex(void){int a = 0;thread ta( inc1, &a);thread tb( inc2, &a);ta.join();tb.join();cout << "a=" << a << endl; }

    在這個(gè)例子中,g_mutex1 和 g_mutex2 都是互斥的資源,任意時(shí)刻都只有一個(gè)線程可以持有(加鎖成功),而且只有持有線程調(diào)用 unlock 釋放鎖資源的時(shí)候其它線程才能去持有,滿足條件 1 和 3,線程 ta 持有了 g_mutex1 之后,在釋放 g_mutex1 之前試圖去持有 g_mutex2,而線程 tb 持有了 g_mutex2 之后,在釋放 g_mutex2 之前試圖去持有 g_mutex1,滿足條件 2 和 4,這種情況之下,當(dāng)線程 ta 試圖去持有 g_mutex2 的時(shí)候,如果 tb 正持有 g_mutex2 而試圖去持有 g_mutex1 時(shí)就發(fā)生了死鎖。在有些環(huán)境下,可能要多次運(yùn)行這個(gè)例子才出現(xiàn)死鎖,實(shí)際工作中這種偶現(xiàn)特性讓查找問題變難。要破除這個(gè)死鎖,我們只要按如下代碼所示破除條件 3 和 4 即可:

    清單 20.例子 thread_break_deadlock.cc
    static mutex g_mutex1, g_mutex2; static voi inc1(int *p ){for(int i = 0; i < COUNT; i++){g_mutex1.lock();(*p)++;g_mutex1.unlock();g_mutex2.lock();// do something.g_mutex2.unlock();} } static void inc2(int *p ){for(int i = 0; i < COUNT; i++){g_mutex2.lock();// do other thing.g_mutex2.unlock();g_mutex1.lock();(*p)++;g_mutex1.unlock();} } void threadMutex(void){int a = 0;thread ta( inc1, &a);thread tb( inc2, &a);ta.join();tb.join();cout << "a=" << a << endl; }

    在一些復(fù)雜的并行編程場(chǎng)景,如何避免死鎖是一個(gè)很重要的話題,在實(shí)踐中,當(dāng)我們看到有兩個(gè)鎖嵌套加鎖的時(shí)候就要特別提高警惕,它極有可能滿足了條件 2 或者 4。

    結(jié)束語

    上述例子在 CentOS 6.5,g++ 4.8.1/g++4.9 以及 clang 3.5 下面編譯通過,在編譯的時(shí)候,請(qǐng)注意下述幾點(diǎn):

    • 設(shè)置 -std=c++11;
    • 鏈接的時(shí)候設(shè)置 -pthread;
    • 使用 g++編譯鏈接時(shí)設(shè)置 -Wl,--no-as-needed 傳給鏈接器,有些版本的 g++需要這個(gè)設(shè)置;
    • 設(shè)置宏定義 -D_REENTRANT,有些庫函數(shù)是依賴于這個(gè)宏定義來確定是否使用多線程版本的。

    具體可以參考本文所附的代碼中的 Makefile 文件。

    在用 gdb 調(diào)試多線程程序的時(shí)候,可以輸入命令 info threads 查看當(dāng)前的線程列表,通過命令 thread n 切換到第 n 個(gè)線程的上下文,這里的 n 是 info threads 命令輸出的線程索引數(shù)字,例如,如果要切換到第 2 個(gè)線程的上下文,則輸入命令 thread 2。

    聰明地使用多線程,擁抱多線程吧。

    參考資料

    學(xué)習(xí)

    • 關(guān)于 thread 的定義可參考?http://en.wikipedia.org/wiki/Thread?和http://en.wikipedia.org/wiki/Thread_%28computing%29。
    • 關(guān)于 C++11 標(biāo)準(zhǔn)中 thread 以及其它相關(guān)類的聲明可以參考草案N3242。
    • 參考 Bjarne Stroustrup 的《The C++ Programming Language》第 4 版是 C++11 的權(quán)威著作。
    • 訪問 developerWorks?Linux 專區(qū),了解關(guān)于信息管理的更多信息,獲取技術(shù)文檔、how-to 文章、培訓(xùn)、下載、產(chǎn)品信息以及其他資源。

    討論

    • 加入?developerWorks 中文社區(qū)。查看開發(fā)人員推動(dòng)的博客、論壇、組和維基,并與其他 developerWorks 用戶交流。

    總結(jié)

    以上是生活随笔為你收集整理的[C++11 std::thread] 使用C++11 编写 Linux 多线程程序的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。