c++11线程池
線程池的作用
避免大量的線程的創建和銷毀,節約系統資源
線程池的分類
線程池的實現分為兩類:
1)半同步半異步的線程池
2)領導者和追隨者模式
參考:
https://blog.csdn.net/vjhghjghj/article/details/103956236?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-1&spm=1001.2101.3001.4242
半同步半異步的線程池
共分為3層,第一層,同步服務層,處理來自上層的任務請求,第二層是同步隊列,第三層是:異步服務層
同步隊列
功能:存儲待處理的任務,交給線程池來處理。
實現:鎖保證線程同步,條件變量實現線程通信,線程池空要等待,不空通知一個線程去處理;線程池滿就等待,不滿通知上層去添加新的任務。
#pragma once
#include<list>
#include<thread>
#include<condition_variable>
template<typename T>
class SyncQueue {
public:
?? ?SyncQueue(const int maxSize):m_maxSize(maxSize),m_needStop(false){}
?? ?bool notFull() {
?? ??? ?bool full = m_queue.size() >= m_maxSize;
?? ??? ?if (full)
?? ??? ??? ?std::cout << "緩沖區滿了,不能添加了" << std::endl;
?? ??? ?return !full;
?? ?}
?? ?bool notEmpty() {
?? ??? ?bool empty = m_queue.empty();
?? ??? ?return !empty();
?? ?}
?? ?template<typename F>
?? ?void Add(F&& x) {
?? ??? ?std::unique_lock<std::mutex> locker(m_mutex);
?? ??? ?m_notFull.wait(locker, [this] {return m_needStop || notFull(); });
?? ??? ?if (m_needStop)
?? ??? ??? ?return;
?? ??? ?m_queue.push_back(std::forward(x));
?? ??? ?m_notEmpty.notify_one();
?? ?}
?? ?
?? ?void put(T&& x) {
?? ??? ?Add(std::forward(x));
?? ?}
?? ?void Take(std::list<T>& list) {
?? ??? ?std::unique_lock<std::mutex> locker(m_mutex);
?? ??? ?m_notEmpty.wait(locker, [this] {return m_needStop || notEmpty(); });
?? ??? ?if (m_needStop) {
?? ??? ??? ?return;
?? ??? ?}
?? ??? ?list = std::move(m_queue);
?? ??? ?m_notFull.notify_one();
?? ?}
?? ?void Take(T& t) {
?? ??? ?std::unique_lock<std::mutex> locker(m_mutex);
?? ??? ?m_notEmpty.wait(locker, [this] {return m_needStop || notEmpty(); });
?? ??? ?if (m_needStop) {
?? ??? ??? ?return;
?? ??? ?}
?? ??? ?t = m_queue.front();
?? ??? ?m_queue.pop_front();
?? ??? ?m_notFull.notify_one();
?? ?}
?? ?void Stop() {
?? ??? ?{
?? ??? ??? ?std::lock_guard<std::mutex> locker(m_mutex);
?? ??? ??? ?m_needStop = true;
?? ??? ?}
?? ??? ?m_notFull.notify_all();
?? ??? ?m_notEmpty.notify_all();
?? ?}
?? ?bool Empty() {
?? ??? ?
?? ??? ??? ?std::lock_guard<std::mutex> locker(m_mutex);
?? ??? ??? ?return m_queue.empty();
?? ??? ?
?? ?}
?? ?bool Full() {
?? ??? ?std::lock_guard<std::mutex> locker(m_mutex);
?? ??? ?return m_queue.size() == m_maxSize;
?? ?}
?? ?size_t Size() {
?? ??? ?std::lock_guard<std::mutex> locker(m_mutex);
?? ??? ?return m_queue.size() ;
?? ?}
?? ?size_t Count() {
?? ??? ?std::lock_guard<std::mutex> locker(m_mutex);
?? ??? ?return m_queue.size();
?? ?}
?? ?
private:
?? ?std::list<T> m_queue;
?? ?std::mutex m_mutex;
?? ?std::condition_variable m_notEmpty;
?? ?std::condition_variable m_notFull;
?? ?int m_maxSize;
?? ?bool m_needStop;
};
?
線程池實現
#pragma once
#include"syncQueue.h"
#include<functional>
#include<atomic>
const int MaxTaskCount = 10;
class ThreadPool {
public:
?? ?using Task = std::function<void()>;
?? ?ThreadPool(int numThreads=std::thread::hardware_concurrency()):m_queue(MaxTaskCount) {
?? ??? ?Start(numThreads);
?? ?}
?? ?~ThreadPool()
?? ?{
?? ??? ?Stop();
?? ?}
?? ?void Stop() {
?? ??? ?std::call_once(m_flag, [this] {StopThreadGroup(); });
?? ?}
?? ?void AddTask(Task&& task) {
?? ??? ?m_queue.put(std::forward<Task>(task));
?? ?}
?? ?void AddTask(const Task& task) {
?? ??? ?m_queue.put(task);
?? ?}
private:
?? ?void Start(int numThreads) {
?? ??? ?m_runing = true;
?? ??? ?for (int i = 0; i < numThreads; i++) {
?? ??? ??? ?m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
?? ??? ?}
?? ?}
?? ?void StopThreadGroup() {
?? ??? ?while (m_queue.notEmpty());
?? ??? ?m_queue.Stop();
?? ??? ?m_runing = false;
?? ??? ?for (auto thread : m_threadgroup) {
?? ??? ??? ?if (thread)
?? ??? ??? ??? ?thread->join();
?? ??? ?}
?? ??? ?m_threadgroup.clear();
?? ?}
?? ?void RunInThread() {
?? ??? ?Task task;
?? ??? ?while (m_runing) {
?? ??? ??? ?
?? ??? ??? ?m_queue.Take(task);
?? ??? ??? ?if (!m_runing)
?? ??? ??? ??? ?return;
?? ??? ??? ?task();
?? ??? ??? ?/*
?? ??? ??? ?std::list<Task> list;
?? ??? ??? ?m_queue.Take(list);
?? ??? ??? ?std::cout <<"list.size:"<< list.size() << std::endl;
?? ??? ??? ?for (auto& task : list) {
?? ??? ??? ??? ?if (!m_runing)
?? ??? ??? ??? ??? ?return;
?? ??? ??? ??? ?task();
?? ??? ??? ?}
?? ??? ??? ?*/
?? ??? ?}
?? ?}
?? ?std::list<std::shared_ptr<std::thread>> m_threadgroup;
?? ?SyncQueue<Task> m_queue;
?? ?std::atomic_bool m_runing;
?? ?std::once_flag m_flag;
};
?
?
總結
- 上一篇: C++11与设计模式的交流
- 下一篇: s3c2440移植MQTT