【C++11应用】基于C++11及std::thread实现的线程池

时间:2024-01-02 20:21:08

基于C++11及std::thread实现的线程池

线程池源码:

#pragma once
#include <functional>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <memory>
#include <cassert> /*
* Example:
*
* // 初始回调
* void InitCb() {
* static int i = 0;
* printf("InitCb: %d\n", i++);
* }
*
* // 任务函数
* void TaskCb() {
* static int i = 0;
* printf("TaskCb: %d\n", i++);
* }
*
* // 实例化大小为10的线程池
* std::unique_ptr<ThreadPool> upTp(new ThreadPool(10, std::bind(InitCb)));
* // 启动5个线程
* upTp->Start(5);
* do {
* // 投递任务进行处理
* upTp->InputTask(std::bind(TaskCb));
* } while(true);
*
*/
class ThreadPool
{
public:
ThreadPool(const ThreadPool& other) = delete;
ThreadPool& operator=(const ThreadPool& other) = delete;
using Task = std::function<void()>; ThreadPool(size_t max_queue_size, Task cb)
: cb_(std::move(cb)),
max_queue_size_(max_queue_size),
running_(false)
{
} ~ThreadPool()
{
if (running_)
{
Stop();
}
} void Start(unsigned num_threads)
{
assert(threads_.empty());
running_ = true;
threads_.reserve(num_threads);
for (auto i = 0; i < num_threads; ++i)
{
threads_.emplace_back(new std::thread(&ThreadPool::Process, this, i));
} if (num_threads == 0 && cb_)
{
cb_();
}
} void Stop()
{
{
std::lock_guard<std::mutex> lk(mtx_);
running_ = false;
not_empty_cond_.notify_all();
} for (auto& th : threads_)
{
th->join();
}
} void InputTask(Task task)
{
if (threads_.empty())
{
task();
}
else
{
std::unique_lock<std::mutex> lk(mtx_);
while (IsFull())
{
not_full_cond_.wait(lk);
}
assert(!IsFull()); queue_.push_back(std::move(task));
not_empty_cond_.notify_one();
}
} private:
void Process(unsigned nId)
{
printf("current thread id: %d\n", nId);
try
{
if (cb_)
{
cb_();
} while (running_)
{
Task task(Pop());
if (task)
{
task();
}
}
}
catch (...)
{
// TODO
throw;
}
} Task Pop()
{
std::unique_lock<std::mutex> lk(mtx_);
while (queue_.empty() && running_)
{
not_empty_cond_.wait(lk);
} Task task;
if (!queue_.empty())
{
task = queue_.front();
queue_.pop_front();
if (max_queue_size_ > 0)
{
not_full_cond_.notify_one();
}
}
return task;
} bool IsFull()
{
return max_queue_size_ > 0 && queue_.size() >= max_queue_size_;
} mutable std::mutex mtx_;
std::condition_variable not_empty_cond_;
std::condition_variable not_full_cond_;
Task cb_;
std::vector<std::unique_ptr<std::thread>> threads_;
std::deque<Task> queue_;
size_t max_queue_size_;
bool running_;
};

参考:陈硕——《Linux多线程服务器编程》