日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > c/c++ >内容正文

c/c++

用C++11 实现 thread pool

發(fā)布時(shí)間:2024/4/18 c/c++ 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 用C++11 实现 thread pool 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

最近看了看我的計(jì)劃,寫道這里也算是到了一半,大部分都是講的單一的C++11的用法,基本都是理論知識(shí),就像我上大學(xué)的時(shí)候,老師一直講理論知識(shí),結(jié)局就是能去能不去的時(shí)候,我選擇了后者。所以在這里穿插一下小的綜合運(yùn)用文章,讓大家知道為什么要用C++11,C++11好在哪里,項(xiàng)目中如何運(yùn)用C++11.

首先介紹一下背景。在我們的工作中,避免不了多線程之間的配合。在現(xiàn)在處理器動(dòng)輒8核16核的背景下,如果我們的程序還停留在單線程的模型,那么我們就沒法享受多處理器帶來的性能提成。之前看過我司代碼中的threadpool。寫的那叫一個(gè)滴水不漏,每個(gè)小細(xì)節(jié)都有大量的代碼去實(shí)現(xiàn)。不但非常冗長,而且以我的智商基本上讀不懂。唯一的有點(diǎn)就是:真穩(wěn)定。不過threadpool的模型已經(jīng)很難融入現(xiàn)代C++了。

所以有必要通過C++11來重新實(shí)現(xiàn)一下threadpool,對(duì)比一下modern

C++和C98.

1. 為什么要有threadpool?

如果談?wù)搕hreadpool,你會(huì)想到有什么功能呢?

傳統(tǒng)的模型大概是這樣的,把一個(gè)函數(shù)指針傳給threadpool。然后thread們會(huì)在合適的時(shí)候調(diào)用這個(gè)函數(shù)。那么還有一個(gè)問題就是函數(shù)的返回值怎么傳遞回調(diào)用的線程。這個(gè)功能往往有很多種方法,我司的思路就是調(diào)用你的callback將函數(shù)返回值返回給你。當(dāng)然不是返回給調(diào)用函數(shù)的線程。

以上的描述中反映的threadpool的兩個(gè)最基本的需求:

  • ?可以把一個(gè)可執(zhí)行的對(duì)象扔給threadpool去執(zhí)行。

  • 可以把執(zhí)行的返回值帶回。

  • 其實(shí)這就是threadpool存在的合理性-- 把工作扔給它,我只要知道結(jié)果就行。當(dāng)然任務(wù)扔給threadpool后,你就可以去干一些別的工作。

    有人會(huì)說,扔給threadpool,無非是讓別的線程去干活,干的總活并沒有減少。相反,一些threadpool的開銷反而讓工作變的更慢。至于這個(gè)問題我想用redis來舉例子。

    眾所周知,redis最新版本支持的多線程。redis的作者在解釋為什么引入多線程的時(shí)候說過。在他們維護(hù)redis的時(shí)候,發(fā)現(xiàn)redis的瓶頸竟然出現(xiàn)在分配內(nèi)存上(從socket上拷貝內(nèi)存)。所以你會(huì)發(fā)現(xiàn)redis起了多線,只是為了加速內(nèi)存拷貝,最終的邏輯還是在一個(gè)線程執(zhí)行的。所以可以看出,可以把較慢的代碼或者可以流水操作的代碼讓不同的線程執(zhí)行。

    ?

    2.?現(xiàn)代化threadpool提出了什么更高的要求?

    之前我們分享過std::function。std::function 是C++11提供的可執(zhí)行代碼的包裝器,它可以是一個(gè)普通函數(shù),或者是函數(shù)指針,或者是lambda...,所以對(duì)于我們來說,threadpool也要支持std::function能支持的類型。

    關(guān)于返回值,還有如何返回到calling thread,之前我們也分享過std::future.

    還有就是線程間的同步,之前我們分享過 std::condition_variable。

    還有就是thread的包裝器std::packaged_task.

    至此我們湊齊了實(shí)現(xiàn)threadpool的幾大件,下面我們看看如何來實(shí)現(xiàn)它

    3. 原理:

    3.1 對(duì)象定義

    要實(shí)現(xiàn)一個(gè)threadpool。我們要有以下的信息:

  • 我們要有個(gè)結(jié)構(gòu)體,記住我們控制的thread。

  • 我們要有個(gè)結(jié)構(gòu)體,記住我們要做的事情。

  • 我們要有個(gè)condition_variable來做線程間同步。

  • 為了優(yōu)雅的推出,我們要有個(gè)標(biāo)志位,標(biāo)志著我現(xiàn)在想推出了,大家都退下吧。

  • 功能上要有:

  • 構(gòu)造函數(shù)

  • 析構(gòu)函數(shù)

  • 最重要的 -- 添加任務(wù)的函數(shù)

  • class ThreadPool {public:ThreadPool(size_t);template <class F, class... Args>auto enqueue(F &&f, Args &&... args) -> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();private:// need to keep track of threads so we can join themstd::vector<std::thread> workers;// the task queuestd::queue<std::function<void()>> tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop; };

    3.2?初始化

    這里構(gòu)建了我們需要的thread。并把它放在一個(gè)vector里。

    這個(gè)thread只干一件事,那就是等condition_variable的通知,如果有通知,那么從task queue里邊拿出一個(gè)task,并執(zhí)行該task。

    當(dāng)然還有一些判斷是否退出的邏輯。

    inline ThreadPool::ThreadPool(size_t threads): stop(false) {for (size_t i = 0; i < threads; ++i)workers.emplace_back([this] {for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}}); }

    4. 添加任務(wù)API

    說到函數(shù),不可避免的就是函數(shù)的實(shí)現(xiàn)和函數(shù)的參數(shù),為了實(shí)現(xiàn)支持不同的類型,我們選擇了用模板來適配。

    同時(shí)為了得到返回值,我們將返回值設(shè)置成了future。那么問題來了,這該如何實(shí)現(xiàn)?是不是想起了packaged_task??如果忘了,回憶一下吧。

    packaged_task可以將可執(zhí)行的工作打包,然后獲取它的future。

    至此我們就可以實(shí)現(xiàn)我們的功能了。思路就是來了一個(gè)可執(zhí)行的工作,首先封裝成packaged_task。然后把這個(gè)task放到task queue中。并且通知一個(gè)線程說queue里邊有東西了,趕緊去干活。

    在返回之前,得到它的future并返回。

    實(shí)現(xiàn)如下:

    template <class F, class... Args> auto ThreadPool::enqueue(F &&f, Args &&... args)-> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res; }

    至此,所有功能都實(shí)現(xiàn)了,有了C++11,是不是一切都變得美好了起來,用了60行就實(shí)現(xiàn)了以前無數(shù)行才能實(shí)現(xiàn)的功能,而且簡單易懂,支持現(xiàn)代化的C++調(diào)用。

    5.?完整代碼

    class ThreadPool {public:ThreadPool(size_t);template <class F, class... Args>auto enqueue(F &&f, Args &&... args) -> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop; }; template <class F, class... Args> auto ThreadPool::enqueue(F &&f, Args &&... args)-> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res; } inline ThreadPool::ThreadPool(size_t threads): stop(false) {for (size_t i = 0; i < threads; ++i)workers.emplace_back([this] {for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}}); } inline ThreadPool::~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread &worker : workers)worker.join(); } #include <iostream> #include <vector> #include <chrono> #include <map> #include <thread> #include <string> #include <tuple> #include "MThreadPool.hpp" #include <log4cplus/logger.h> //#include <log4cplus/consoleappender.h> #include <log4cplus/fileappender.h> #include <log4cplus/layout.h> //#include <log4cplus/ndc.h> //#include <log4cplus/mdc.h> #include <log4cplus/helpers/loglog.h> #include <log4cplus/thread/threads.h> //#include <log4cplus/helpers/sleep.h> #include <log4cplus/loggingmacros.h>using namespace std; using namespace log4cplus; using namespace log4cplus::helpers; Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("Max:"));using namespace std; typedef tuple<string, int> M_TUPLE; typedef std::function<M_TUPLE()> M_FUNCTION; vector<std::function<tuple<string, int>()>> M_VECTOR; typedef std::lock_guard<std::recursive_mutex> M_GUARD; typedef std::unique_lock<std::recursive_mutex> M_UNIQUE; std::recursive_mutex func_mutex;enum RET_CODE {M_SUCCESS = 0,M_FAIL,M_MAX };M_TUPLE M_put() {std::string M_func(__func__);// fucntion bodyLOG4CPLUS_DEBUG(logger, "hello!");std::this_thread::sleep_for(std::chrono::microseconds(100)); //seconds(1));LOG4CPLUS_DEBUG(logger, "world!");return std::make_tuple(M_func, M_SUCCESS); } void M_LOG() { }int main() {log4cplus::initialize();try{SharedObjectPtr<Appender> append_1(new FileAppender("Test.log"));append_1->setName(LOG4CPLUS_TEXT("First"));log4cplus::tstring pattern = LOG4CPLUS_TEXT("[%d{%m/%d/%y %H:%M:%S,%Q}] %c %-5p - %m [%l]%n");// std::tstring pattern = LOG4CPLUS_TEXT("%d{%c} [%t] %-5p [%.15c{3}] %%%x%% - %m [%l]%n");append_1->setLayout(std::auto_ptr<Layout>(new PatternLayout(pattern)));Logger::getRoot().addAppender(append_1);logger.setLogLevel(DEBUG_LOG_LEVEL);}catch (...){Logger::getRoot().log(FATAL_LOG_LEVEL, LOG4CPLUS_TEXT("Exception occured..."));}LOG4CPLUS_DEBUG(logger, "set logger done!"<< "\nhello log4cplus\n");int thread_num = std::thread::hardware_concurrency();if (!thread_num){thread_num = 1;}M_VECTOR.push_back(M_put);M_VECTOR.push_back(M_put);M_VECTOR.push_back(M_put);M_VECTOR.push_back(M_put);std::cout << " start " << thread_num << "threads" << std::endl;ThreadPool pool(thread_num);std::vector<std::future<M_TUPLE>> results;M_FUNCTION tmp;while (!M_VECTOR.empty()){{M_GUARD lock1(func_mutex);tmp = M_VECTOR.back();}results.emplace_back(pool.enqueue([=] {return tmp();}));{M_GUARD lock1(func_mutex);M_VECTOR.pop_back();}}for (auto &&result : results){std::string tmp_str;int tmp_bool;tie(tmp_str, tmp_bool) = result.get();cout << "string is " << tmp_str << "bool is " << tmp_bool << endl;}std::cout << std::flush;return 0; }

    ?

    總結(jié)

    以上是生活随笔為你收集整理的用C++11 实现 thread pool的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。