日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

C 11实现的100行线程池

發布時間:2023/12/2 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 C 11实现的100行线程池 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

【導讀】:C 線程池一直都是各位程序員們造輪子的首選項目之一。今天,小編帶大家一起來看看這個輕量的線程池,本線程池是header-only的,并且整個文件只有100行,其中C 的高級用法有很多,很值得我們學習,一起來看看吧。

以下是正文


線程池

C 帶有線程操作,異步操作,就是沒有線程池,至于線程池的概念,我先搜一下別人的解釋:


一般而言,線程池有以下幾個部分:

1. 完成主要任務的一個或多個線程。


2. 用于調度管理的管理線程。

3. 要求執行的任務隊列。

我來講講人話:你的函數需要在多線程中運行,但是你又不能每來一個函數就開啟一個線程,所以你就需要固定的N個線程來跑執行,但是有的線程還沒有執行完,有的又在空閑,如何分配任務呢,你就需要封裝一個線程池來完成這些操作,有了線程池這層封裝,你就只需要告訴它開啟幾個線程,然后直接塞任務就行了,然后通過一定的機制獲取執行結果。


這里有一個100行實現線程池的操作:

https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h

分析源代碼 頭文件

#include?#include?#include?#include?#include?#include?#include?#include?#include?

vector,queue,momory 都沒啥說的,thread線程相關,mutex 互斥量,解決資源搶占問題,condition_variable 條件量,用于喚醒線程和阻塞線程,future 從使用的角度出發,它是一個獲取線程數據的函數。functional 函數子,可以理解為規范化的函數指針。stdexcept 就跟它的名字一樣,標準異常。

class?ThreadPool?{public:????ThreadPool(size_t);????template<class?F,?class...?Args>????auto?enqueue(F&&?f,?Args&&...?args)?????????->?std::future<typename?std::result_of::type>;????~ThreadPool();private:????//?need?to?keep?track?of?threads?so?we?can?join?them????std::vector<?std::thread?>?workers;????//?the?task?queue????std::queue<?std::function?>?tasks;?????//?synchronization????std::mutex?queue_mutex;????std::condition_variable?condition;????bool?stop;};

線程池的聲明,構造函數,一個enqueue模板函數 返回std::future, 然后這個type又利用了運行時檢測(還是編譯時檢測?)推斷出來的,非常的amazing啊。成功的使用一行代碼反復套娃,這高階的用法就是大佬的水平嗎,i了i了。


workers 是vector<:thread> 俗稱工作線程。


std::queue<std::function> tasks 俗稱任務隊列。


那么問題來了,這個任務隊列的任務只能是void() 類型的嗎?感覺沒那么簡單,還得接著看吶。


mutex,condition_variable 沒啥講的,stop 控制線程池停止的。

// the constructor just launches some amount of workersinline ThreadPool::ThreadPool(size_t threads) ? ?: ? stop(false){ ? ?for(size_t i = 0;i ? ? ? ?workers.emplace_back( ? ? ? ? ? ?[this] ? ? ? ? ? ?{ ? ? ? ? ? ? ? ?for(;;) ? ? ? ? ? ? ? ?{ ? ? ? ? ? ? ? ? ? ?std::function task;{ ? ? ? ? ? ? ? ? ? ? ? ?std::unique_lock<:mutex> lock(this->queue_mutex); ? ? ? ? ? ? ? ? ? ? ? ?this->condition.wait(lock, ? ? ? ? ? ? ? ? ? ? ? ? ? ?[this]{ return this->stop || !this->tasks.empty(); }); ? ? ? ? ? ? ? ? ? ? ? ?if(this->stop && this->tasks.empty()) ? ? ? ? ? ? ? ? ? ? ? ? ? ?return; ? ? ? ? ? ? ? ? ? ? ? ?task = std::move(this->tasks.front()); ? ? ? ? ? ? ? ? ? ? ? ?this->tasks.pop(); ? ? ? ? ? ? ? ? ? ?}task(); ? ? ? ? ? ? ? ?} ? ? ? ? ? ?} ? ? ? ?);}

大佬寫的注釋就是這么樸實無華,說這個構造函數僅僅是把一定數量的線程塞進去,我是看了又看才悟出來這玩意是什么意思……雖然本質上的確是它說的只是把線程塞進去,但是這個線程也太繞了。


workers.emplace_back 參數是一個lambda表達式,不會阻塞,也就是說最外層的是一個異步函數,每個線程里面的事情才是重點。


labmda表達式中最外層是一個死循環,至于為什么是for(;;)而不是while(1) 這雖然不是重點,不過大佬的用法還是值得揣摩的,我估計是效率會更高?


task 申明后,緊跟著一個大括號,這個{}里面的部分,是一個同步操作,至于為什么用this->lock 而不是直接使用[&]來捕獲參數,想來也是處于內存考慮。精打細算的風格像極了摳門的地主,i了i了。


緊接著一個wait(lock,condtion)的操作,像極了千層餅的套路。


第一層:這TM不是要鎖死自己啊?這樣不是構造都得卡死?


第二層:我們看到它emplace_back了一個線程,不會阻塞,但是等開鎖,鎖不就在它自己的線程里面嘛?那不得鎖死了啊?


第三層:我們看到這個lock其實只是個包裝,真正的鎖是外層的mutex,所以從這里是不存在死鎖的。但是你的wait的condition怎么可能不懂呢,必須要 stop 或者 !empty 才wait嗎?


第四層:我們查資料發現后面的condition是返回false才會wait,也就是說要!stop && empty才會wait,就是說這個線程池是 運行態,并且沒有任務才才會執行等待操作!否則就不等了,直接沖!


第五層:既然你判斷了上面判斷了stop和非空,為啥下面還要判斷stop和空才退出呢?不顯得冗余?


第六層:要確定它的確是被置為stop了,且隊列執行空了,它才能夠光榮退休。有沒有問題呢,有,最后所有線程都阻塞了,你stop置為true它們也不知道啊……


我估計它的stop會有喚醒所有線程的操作,不過如果有的在執行,有的在等待,應該沒辦法都通知到位,但是在執行的在下一次判斷的時候也能正常退出。


因為有了疑惑,我們就想看stop相關的操作,結果發現放在了析構函數里面……

//?the?destructor?joins?all?threadsinline?ThreadPool::~ThreadPool(){????{????????std::unique_lock<:mutex>?lock(queue_mutex);????????stop?=?true;????}????condition.notify_all();????for(std::thread?&worker:?workers)????????worker.join();}

{}里面上鎖進行了stop為true的操作,至于為什么不用原子操作,我也不知道,但是仔細想了下大概是因為本來就有一把鎖了,再用原子就不是內味兒了。然后它果然通知了所有,并且還把工作線程join了。也就是等它們結束。

結束了千層餅の解析之后,我們看看最重要的入隊操作

// add new work item to the pooltemplate<class F, class... Args>auto ThreadPool::enqueue(F&& f, Args&&... args) ? ?-> std::future<typename std::result_of::type>{ ? ?using return_type = typename std::result_of::type;auto task = std::make_shared< std::packaged_task >( ? ? ? ? ? ?std::bind(std::forward(f), std::forward(args)...) ? ? ? ?); ? ?std::future res = task->get_future(); ? ?{ ? ? ? ?std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the pool ? ? ? ?if(stop) ? ? ? ? ? ?throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });

總結

以上是生活随笔為你收集整理的C 11实现的100行线程池的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。