一个Windows C++的线程池的实现

时间:2023-03-08 17:16:36

此线程池所依赖的线程类,请参看《一个Windows C++的线程类实现》:

http://blog.****.net/huyiyang2010/archive/2010/08/10/5801597.aspx

ThreadPoolExecutor.h

  1. #ifndef __THREAD_POOL_EXECUTOR__
  2. #define __THREAD_POOL_EXECUTOR__
  3. #include "Thread.h"
  4. #include <set>
  5. #include <list>
  6. #include <windows.h>
  7. class CThreadPoolExecutor
  8. {
  9. public:
  10. CThreadPoolExecutor(void);
  11. ~CThreadPoolExecutor(void);
  12. /**
  13. 初始化线程池,创建minThreads个线程
  14. **/
  15. bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse);
  16. /**
  17. 执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true
  18. 若当前任务列表满了,但当前线程数量小于最大线程数,将创建新线程执行此任务,返回true
  19. 若当前任务列表满了,但当前线程数量等于最大线程数,将丢弃此任务,返回false
  20. **/
  21. bool Execute(Runnable * pRunnable);
  22. /**
  23. 终止线程池,先制止塞入任务,
  24. 然后等待直到任务列表为空,
  25. 然后设置最小线程数量为0,
  26. 等待直到线程数量为空,
  27. 清空垃圾堆中的任务
  28. **/
  29. void Terminate();
  30. /**
  31. 返回线程池中当前的线程数量
  32. **/
  33. unsigned int GetThreadPoolSize();
  34. private:
  35. /**
  36. 获取任务列表中的任务,若任务列表为空,返回NULL
  37. **/
  38. Runnable * GetTask();
  39. static unsigned int WINAPI StaticThreadFunc(void * arg);
  40. private:
  41. class CWorker : public CThread
  42. {
  43. public:
  44. CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL);
  45. ~CWorker();
  46. void Run();
  47. private:
  48. CThreadPoolExecutor * m_pThreadPool;
  49. Runnable * m_pFirstTask;
  50. volatile bool m_bRun;
  51. };
  52. typedef std::set<CWorker *> ThreadPool;
  53. typedef std::list<Runnable *> Tasks;
  54. typedef Tasks::iterator TasksItr;
  55. typedef ThreadPool::iterator ThreadPoolItr;
  56. ThreadPool m_ThreadPool;
  57. ThreadPool m_TrashThread;
  58. Tasks m_Tasks;
  59. CRITICAL_SECTION m_csTasksLock;
  60. CRITICAL_SECTION m_csThreadPoolLock;
  61. volatile bool m_bRun;
  62. volatile bool m_bEnableInsertTask;
  63. volatile unsigned int m_minThreads;
  64. volatile unsigned int m_maxThreads;
  65. volatile unsigned int m_maxPendingTasks;
  66. };
  67. #endif

ThreadPoolExecutor.cpp

  1. #include "ThreadPoolExecutor.h"
  2. CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) :
  3. m_pThreadPool(pThreadPool),
  4. m_pFirstTask(pFirstTask),
  5. m_bRun(true)
  6. {
  7. }
  8. CThreadPoolExecutor::CWorker::~CWorker()
  9. {
  10. }
  11. /**
  12. 执行任务的工作线程。
  13. 当前没有任务时,
  14. 如果当前线程数量大于最小线程数量,减少线程,
  15. 否则,执行清理程序,将线程类给释放掉
  16. **/
  17. void CThreadPoolExecutor::CWorker::Run()
  18. {
  19. Runnable * pTask = NULL;
  20. while(m_bRun)
  21. {
  22. if(NULL == m_pFirstTask)
  23. {
  24. pTask = m_pThreadPool->GetTask();
  25. }
  26. else
  27. {
  28. pTask = m_pFirstTask;
  29. m_pFirstTask = NULL;
  30. }
  31. if(NULL == pTask)
  32. {
  33. EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
  34. if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)
  35. {
  36. ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);
  37. if(itr != m_pThreadPool->m_ThreadPool.end())
  38. {
  39. m_pThreadPool->m_ThreadPool.erase(itr);
  40. m_pThreadPool->m_TrashThread.insert(this);
  41. }
  42. m_bRun = false;
  43. }
  44. else
  45. {
  46. ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();
  47. while(itr != m_pThreadPool->m_TrashThread.end())
  48. {
  49. (*itr)->Join();
  50. delete (*itr);
  51. m_pThreadPool->m_TrashThread.erase(itr);
  52. itr = m_pThreadPool->m_TrashThread.begin();
  53. }
  54. }
  55. LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
  56. continue;
  57. }
  58. else
  59. {
  60. pTask->Run();
  61. pTask = NULL;
  62. }
  63. }
  64. }
  65. /////////////////////////////////////////////////////////////////////////////////////////////
  66. CThreadPoolExecutor::CThreadPoolExecutor(void) :
  67. m_bRun(false),
  68. m_bEnableInsertTask(false)
  69. {
  70. InitializeCriticalSection(&m_csTasksLock);
  71. InitializeCriticalSection(&m_csThreadPoolLock);
  72. }
  73. CThreadPoolExecutor::~CThreadPoolExecutor(void)
  74. {
  75. Terminate();
  76. DeleteCriticalSection(&m_csTasksLock);
  77. DeleteCriticalSection(&m_csThreadPoolLock);
  78. }
  79. bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)
  80. {
  81. if(minThreads == 0)
  82. {
  83. return false;
  84. }
  85. if(maxThreads < minThreads)
  86. {
  87. return false;
  88. }
  89. m_minThreads = minThreads;
  90. m_maxThreads = maxThreads;
  91. m_maxPendingTasks = maxPendingTasks;
  92. unsigned int i = m_ThreadPool.size();
  93. for(; i<minThreads; i++)
  94. {
  95. //创建线程
  96. CWorker * pWorker = new CWorker(this);
  97. if(NULL == pWorker)
  98. {
  99. return false;
  100. }
  101. EnterCriticalSection(&m_csThreadPoolLock);
  102. m_ThreadPool.insert(pWorker);
  103. LeaveCriticalSection(&m_csThreadPoolLock);
  104. pWorker->Start();
  105. }
  106. m_bRun = true;
  107. m_bEnableInsertTask = true;
  108. return true;
  109. }
  110. bool CThreadPoolExecutor::Execute(Runnable * pRunnable)
  111. {
  112. if(!m_bEnableInsertTask)
  113. {
  114. return false;
  115. }
  116. if(NULL == pRunnable)
  117. {
  118. return false;
  119. }
  120. if(m_Tasks.size() >= m_maxPendingTasks)
  121. {
  122. if(m_ThreadPool.size() < m_maxThreads)
  123. {
  124. CWorker * pWorker = new CWorker(this, pRunnable);
  125. if(NULL == pWorker)
  126. {
  127. return false;
  128. }
  129. EnterCriticalSection(&m_csThreadPoolLock);
  130. m_ThreadPool.insert(pWorker);
  131. LeaveCriticalSection(&m_csThreadPoolLock);
  132. pWorker->Start();
  133. }
  134. else
  135. {
  136. return false;
  137. }
  138. }
  139. else
  140. {
  141. EnterCriticalSection(&m_csTasksLock);
  142. m_Tasks.push_back(pRunnable);
  143. LeaveCriticalSection(&m_csTasksLock);
  144. }
  145. return true;
  146. }
  147. Runnable * CThreadPoolExecutor::GetTask()
  148. {
  149. Runnable * Task = NULL;
  150. EnterCriticalSection(&m_csTasksLock);
  151. if(!m_Tasks.empty())
  152. {
  153. Task = m_Tasks.front();
  154. m_Tasks.pop_front();
  155. }
  156. LeaveCriticalSection(&m_csTasksLock);
  157. return Task;
  158. }
  159. unsigned int CThreadPoolExecutor::GetThreadPoolSize()
  160. {
  161. return m_ThreadPool.size();
  162. }
  163. void CThreadPoolExecutor::Terminate()
  164. {
  165. m_bEnableInsertTask = false;
  166. while(m_Tasks.size() > 0)
  167. {
  168. Sleep(1);
  169. }
  170. m_bRun = false;
  171. m_minThreads = 0;
  172. m_maxThreads = 0;
  173. m_maxPendingTasks = 0;
  174. while(m_ThreadPool.size() > 0)
  175. {
  176. Sleep(1);
  177. }
  178. EnterCriticalSection(&m_csThreadPoolLock);
  179. ThreadPoolItr itr = m_TrashThread.begin();
  180. while(itr != m_TrashThread.end())
  181. {
  182. (*itr)->Join();
  183. delete (*itr);
  184. m_TrashThread.erase(itr);
  185. itr = m_TrashThread.begin();
  186. }
  187. LeaveCriticalSection(&m_csThreadPoolLock);
  188. }

用法:

#include "Thread.h"
#include "ThreadPoolExecutor.h"

class R : public Runnable
{
public:
    ~R()
    {
    }
    void Run()
    {
        printf("Hello World/n");
    }
};

int _tmain(int argc, _TCHAR* argv[])
{
    CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();
    pExecutor->Init(1, 10, 50);
    R r;
    for(int i=0;i<100;i++)
    {
        while(!pExecutor->Execute(&r))
        {
        }
    }
    pExecutor->Terminate();
    delete pExecutor;
    getchar();
    return 0;
}

测试结果:

机器:

Intel(R) Core(TM)2 Duo CPU

E8400 @ 3.00GHz

2G内存

对于100个任务并且每个任务包含10000000个循环,任务中无等待:

单线程执行耗时:2281时间片

单线程池执行耗时:2219时间片

2个线程的线程池耗时:1156时间片

5个线程的线程池耗时:1166时间片

10个线程的线程池耗时:1157时间片

100个线程的线程池耗时:1177时间片

from: