、为什么需要线程池? 部分应用程序需要执行很多细小的任务,对于每个任务都创建一个线程来完成,任务完成后销毁线程,而这就会产生一个问题:当执行的任务所需要的时间T1小于等于创建线程时间T2和销毁线程时间T3总和时即T1 <= T2 + T3,应用处理任务的响应能力会大大减弱,从而影响了应用程序性能,为了解决这类问题,线程池技术提供了很好的解决方案。线程池顾名思义就是把线程资源池化,在应用启动时一次性创建合适数量的线程,当需要执行任务时就从线程池中分配一个已经创建好的线程来执行,执行完在把线程归还,只在应用停时再一次性销毁所有的线程。 、线程池的基本组成部分 一个简单的线程池至少包括下列的组成部分: )线程池管理器(ThreadPool):用于创建一个线程池对象并管理线程池,如分配任务给某个空闲线程,查看当前线程状态等等的操作。 )工作线程(WorkThread):线程池中线程,可能是挂起,可能是被分配了任务,若然是挂起,则用一个信号量去阻塞直到有任务分配。 )任务接口(Task):每个任务必须实行的接口,以供工作线程调度任务执行。 、Unix下的线程池实现 将给大家展示的线程池实现的类如下,含有比较多的面向对象设计思想。
主要是
一个线程池管理多个工作线程类,每个工作线程类对象管理一个线程。
1)Mutex:互斥量类,里面只有一个pthread_mutex_t的私有成员,对POSIX互斥量进行封装,后面用于线程池的队列和栈。
Mutex.h
#ifndef MUTEX_H
#define MUTEX_H #include <iostream>
#include <pthread.h>
using namespace std; class Mutex
{
public:
Mutex();
void Lock();
void Unlock(); private:
pthread_mutex_t mutex;
}; #endif
Mutex.cpp
#include "Mutex.h" Mutex::Mutex()
{
pthread_mutex_init(&mutex,NULL);
} void Mutex::Lock()
{
pthread_mutex_lock(&mutex);
} void Mutex::Unlock()
{
pthread_mutex_unlock(&mutex);
}
2)BaseTask:任务的抽象基类,任何具体的任务都要继承该类,并实现自己的void * run()函数,
即线程运行的函数。在这里继承的类定义实现在main函数里面。抽象类不能被实例化,但可以声明为指针指向继承的子类。
BaseTask.h
#ifndef BASE_TASK_H
#define BASE_TASK_H #include <iostream>
using namespace std; class BaseTask
{
public:
virtual void run() = ;
}; #endif
3).MyTask:具体任务类,继承了BaseTask,实现了具体任务。
MyTask.h
#ifndef MYTASK_H
#define MYTASK_H
#include "BaseTask.h" class MyTask : public BaseTask
{
public:
virtual void run(void);
}; #endif
MyTask.cpp
#include "MyTask.h" void MyTask::run(void)
{
cout<<"Hello MyTask"<<endl;
}
4).MyThread: 对POSIX线程的C++封装,实现了执行任务的基本接口
MyThread.h
#ifndef MYTHREAD_H
#define MYTHREAD_H #include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include "BaseTask.h" // 前置定义
class MyThreadPool; class MyThread
{
public:
MyThread(MyThreadPool* mtp);
void Set_Task(BaseTask* task);
void Start_Task();
/* 线程启动函数 必须写成静态成员函数 传入的参数为类对象自己*/
static void* Start_Func(void* arg);
/* 完成一个任务后 询问线程池管理器是否有未完成的任务*/
bool Fetch_Task();
/* 无更多任务 让自己进入线程池栈*/
void Recycle();
private:
/* 用于挂起线程 */
sem_t sem;
pthread_t tid;
BaseTask* task;
MyThreadPool* mtp;
}; #endif
MyThread.cpp
#include "MyThreadPool.h"
#include "MyThread.h" MyThread::MyThread(MyThreadPool* mtp)
{
sem_init(&sem,,);
this->mtp = mtp;
pthread_create(&tid,NULL,Start_Func,(void*)this);
} void MyThread::Set_Task(BaseTask* task)
{
this->task = task;
} void MyThread::Start_Task()
{
sem_post(&sem);
} void* MyThread::Start_Func(void* arg)
{
MyThread* mt = (MyThread*) arg;
while()
{
sem_wait(&mt->sem);
mt->task->run();
if(mt->Fetch_Task())
mt->Start_Task();
else
mt->Recycle();
}
} bool MyThread::Fetch_Task()
{
return mtp->FetchTask(this);
} void MyThread::Recycle()
{
mtp->Recycle(this);
}
MyThreadPool:线程池管理类,实现线程的创建管理和任务调度。
MyThreadPool.h
#ifndef MYTHREADPOOL_H
#define MYTHREADPOOL_H #include <iostream>
#include <stack>
#include <queue>
#include "Mutex.h"
#include "BaseTask.h" class MyThread; class MyThreadPool
{
public:
/* no为要开辟的线程数目*/
MyThreadPool(int no);
/* 添加任务*/
void AddTask(BaseTask* task);
/* 供线程类调用 让线程类对象询问线程池任务队列中是否仍有任务未完成*/
bool FetchTask(MyThread* mt);
/* 回收线程 */
void Recycle(MyThread* mt);
private:
int no;
Mutex smutex;
Mutex qmutex; stack<MyThread*> sthread;
queue<BaseTask*> qtask;
}; #endif
MyThreadPool.cpp
#include "MyThreadPool.h"
#include "MyThread.h" MyThreadPool::MyThreadPool(int no)
{
this->no = no; for(int i=;i<no;++i)
{
sthread.push(new MyThread(this));
}
} void MyThreadPool::AddTask(BaseTask* task)
{
smutex.Lock();
if (!sthread.empty())
{
MyThread* mt = sthread.top();
sthread.pop();
smutex.Unlock();
mt->Set_Task(task);
mt->Start_Task();
}
else
{
smutex.Unlock();
qmutex.Lock();
qtask.push(task);
qmutex.Unlock();
}
} bool MyThreadPool::FetchTask(MyThread* mt)
{
qmutex.Lock();
if (!qtask.empty())
{
mt->Set_Task(qtask.front());
qmutex.Unlock();
return true;
}
else
{
qmutex.Unlock();
return false;
}
} void MyThreadPool::Recycle(MyThread* mt)
{
smutex.Lock();
sthread.push(mt);
smutex.Unlock();
}
main.cpp
#include "MyThread.h"
#include "MyThreadPool.h"
#include "Mutex.h"
#include "MyTask.h" int main()
{
MyThreadPool mtp();
while()
{
BaseTask* task = new MyTask;
mtp.AddTask(task);
sleep();
delete task;
task = NULL;
}
}
makefile
pro: main.cpp libtp.a MyTask.cpp
g++ -lpthread main.cpp -L. -ltp MyTask.cpp -o pro libtp.a: MyThreadPool.o MyThread.o Mutex.o
ar cr libtp.a Mutex.o MyThreadPool.o MyThread.o MyThread.o:MyThread.cpp
g++ -c -lpthread MyThread.cpp -o MyThread.o MyThreadPool.o:MyThreadPool.cpp
g++ -c -lpthread MyThreadPool.cpp -o MyThreadPool.o Mutex.o:Mutex.cpp
g++ -c -lpthread Mutex.cpp -o Mutex.o clean:
rm libtp.a MyThreadPool.o Mutex.o MyThread.o pro
执行 make 成功后,执行./pro 即可
编译: 我首先将线程类(MyThread.o) 互斥类(Mutex.o) 和 线程池类(MyThreadPool.o) 打包成一个静态库
静态库留出两个接口 一个是线程池的初始化 另外一个是用户自己继承BaseTask的run函数后 调用线程池的类对象 AddTask接口去增加任务
所以使用者只需自己指定要开辟的线程数(线程池的构造函数) 和 自定义 任务类对象即可使用这个线程池.
ps:所有文件都放在同一个目录下