LINUX下的简单线程池

时间:2024-03-31 09:37:20

前言

任何一种设计方式的引入都会带来额外的开支,是否使用,取决于能带来多大的好处和能带来多大的坏处,好处与坏处包括程序的性能、代码的可读性、代码的可维护性、程序的开发效率等。

线程池适用场合:任务比较多,需要拉起大量线程来处理;任务的处理时间相对比较短,按照线程的周期T1(创建阶段)、T2(执行阶段)、T3(销毁阶段)来算,执行阶段仅占用较少时间。

简单的线程池通常有以下功能:预创建一定数量的线程;管理线程任务,当工作线程没有事情可做时休眠自己;销毁线程池。

复杂一些的线程池有额外的调节功能:管理线程池的上限;动态调节工作线程数量,当大量工作请求到来时增加工作线程,工作请求较少时销毁部分线程。

内容部分

这次实现的是一个简单的线程池模型。

首先是线程池的头文件定义:

 1 #include<unistd.h>
 2 #include<stdlib.h>
 3 #include<iostream>
 4 #include<string>
 5 #include<string.h>
 6 #include<queue>
 7 #include<errno.h>
 8 #include<pthread.h>
 9 using namespace std;
10 
11 struct  thread_work
12 {   
13     void* (*routine)(void*);
14     void* arg;
15 };
16 
17 struct thread_pool
18 {   
19     bool ShutDown;
20     unsigned int iMaxThreadNum;
21     pthread_mutex_t pool_mutex;
22     pthread_cond_t pool_cond;
23     queue<pthread_t> Pth_IdQueue;
24     queue<thread_work> Pth_workQueue;
25 };
26 
27 thread_pool* thread_pool_create(int iThreadNum);
28 void thread_pool_destroy(thread_pool* thpool);
29 int thread_pool_add_task(thread_pool* thpool, void*(*routine)(void*), void* arg);
30 void* thread_routine(void* arg);

下面是线程池的实现:

  1 #include "ThreadPool.h"
  2 
  3 thread_pool* thread_pool_create(int iThreadMaxNum)
  4 {
  5     int iRet = 0;
  6     thread_pool* pool = new thread_pool;
  7     if(NULL == pool)
  8     {
  9         cout << "new thread_pool failed! procedure exit" << endl;
 10         return NULL;
 11     }
 12     pool->iMaxThreadNum = iThreadMaxNum;
 13     pool->ShutDown = false;
 14     if((iRet=pthread_mutex_init(&pool->pool_mutex, NULL)) != 0)
 15     {
 16         cout << __FUNCTION__ << "thread_pool init failed! error: " << strerror(iRet)  << endl;
 17         delete pool;
 18         return NULL;
 19     }
 20     if((iRet=pthread_cond_init(&pool->pool_cond, NULL)) != 0)
 21     {
 22         cout << __FUNCTION__ << "thread_cond init failed! error: " << strerror(iRet) << endl;
 23         delete pool;
 24         return NULL;
 25     }
 26     
 27     for(int i = 0; i < iThreadMaxNum; i++)
 28     {
 29         pthread_t thid = 0;
 30         if((iRet=pthread_create(&thid, NULL, thread_routine, (void*)pool)) != 0)
 31         {
 32             cout << __FUNCTION__ << " pthread_create failed, error: " << strerror(iRet) << endl;
 33             delete pool;
 34             return NULL;
 35         }
 36         else
 37         {
 38             pool->Pth_IdQueue.push(thid);
 39         }
 40     }
 41     return pool;
 42 }
 43 
 44 int thread_pool_add_task(thread_pool* thpool, void*(*routine)(void*), void* arg)
 45 {
 46     if(routine == NULL  || thpool == NULL)
 47         return -1;
 48     thread_work temp;
 49     temp.routine = routine;
 50     temp.arg = arg;
 51     pthread_mutex_lock(&(thpool->pool_mutex));
 52     thpool->Pth_workQueue.push(temp);
 53     pthread_mutex_unlock(&(thpool->pool_mutex));
 54     pthread_cond_signal(&(thpool->pool_cond));
 55     return 0;
 56 }
 57 
 58 void thread_pool_destroy(thread_pool* thpool)
 59 {
 60     if(thpool == NULL)
 61         return;
 62     pthread_mutex_lock(&(thpool->pool_mutex));
 63     thpool->ShutDown = true;
 64     while(!thpool->Pth_workQueue.empty())
 65     {
 66         thpool->Pth_workQueue.pop();
 67     }
 68     pthread_cond_broadcast(&(thpool->pool_cond));
 69     pthread_mutex_unlock(&(thpool->pool_mutex));
 70     while(!thpool->Pth_IdQueue.empty())
 71     {
 72         pthread_join(thpool->Pth_IdQueue.front(), NULL);
 73         thpool->Pth_IdQueue.pop();
 74     }
 75     pthread_mutex_destroy(&(thpool->pool_mutex));
 76     pthread_cond_destroy(&(thpool->pool_cond));
 77     delete thpool;
 78 }
 79 
 80 void* thread_routine(void* pool)
 81 {
 82     if(pool == NULL)
 83     {
 84         cout << "thread_routine params is empty, thread_exit" << endl;
 85         return NULL;
 86     }
 87     while(1)
 88     {
 89         thread_pool* p = (thread_pool*)pool;
 90         pthread_mutex_lock(&(p->pool_mutex));
 91         while(p->Pth_workQueue.empty() && !p->ShutDown)
 92         {
 93             pthread_cond_wait(&(p->pool_cond), &(p->pool_mutex));
 94         }
 95         if(p->ShutDown)
 96         {
 97             pthread_mutex_unlock(&(p->pool_mutex));
 98             return NULL;
 99         }
100         thread_work work_f;
101         work_f = p->Pth_workQueue.front();
102         p->Pth_workQueue.pop();
103         pthread_mutex_unlock(&(p->pool_mutex));
104         work_f.routine(work_f.arg);
105     }
106 }

代码都是基础知识,大家应该都能理解。

下面是一个测试的demo:

 1 #include "ThreadPool.h"
 2 
 3 void* task(void* p)
 4 {
 5     int ptI = *(int*)p;
 6     //cout << "ptr address : " << p << "ptr value : " << ptI << endl;
 7     cout << ptI << endl;
 8     return NULL;
 9 }
10 
11 int main()
12 {
13     struct thread_pool* pool = thread_pool_create(100);
14     int iArg[1000];
15     for(int i = 0; i < 1000; i++)
16     {
17         iArg[i] = i;
18         thread_pool_add_task(pool, task, &iArg[i]);
19         //cout << "thread_pool_add_task id: " << iArg[i] << endl;
20         //cout << __FUNCTION__ << " success, current task id is: " << iArg[i]  << "iArg[" << i << "] address : " << &iArg[i]  << endl;
21     }
22     thread_pool_destroy(pool);    
23 }

小结

很简单很好理解的一段代码,我却写了大半天时间,手残党鉴定完毕。