Nutshell.ThreadWorkerPool .Net线程池设计

时间:2023-03-10 03:23:19
Nutshell.ThreadWorkerPool .Net线程池设计

功能描述:

  1. 支持创建多个线程池,并统一管理
  2. 支持不同线程池的容量控制,以及最少活动线程的设置
  3. 支持不同线程池中活动线程的闲时设置,即线程空闲时间到期后即自动被回收

结构设计:

  • ThreadWorkerPoolManager: 线程池管理器,用于统一创建,获取,销毁线程池,使用单例模式
  • ThreadWorkerPool: 线程池,用于管理指定数量的线程,由ThreadWorkerPoolManager管理,自身无法创建与销毁
  • TheadWorkerPoolItem: 线程池项,用于包装线程工作器,协助ThreadWorkerPool更好的管理线程,例如取出,放回,闲时的控制
  • TheadWorker: 线程工作器,用于包装系统线程System.Threading.Thread,使其可以重复使用,减少Thrad创建和销毁的性能开销

  结构关系图:

  Nutshell.ThreadWorkerPool .Net线程池设计

 详细设计:

  ThreadWoker

  要点设计:

  1. 完成一次任务后,System.Threading.Thread不能被系统销毁, 默认情况下new Thread(ThreadStart start).Start(), 当ThreadStart委托的任务完成后,系统将销毁该线程,也就是说创建一个System.Threading.Thread实例只能使用一次;为了使线程能被重复使用,ThreadWoker将使用 while+sleeping 的方式对系统线程进行包装,同时使用AutoResetEvent代替Thread.Sleep(timeout)来达到更佳的控制
  2. 闲时设计,线程资源是极其宝贵的系统资源,如果线程池中存在大量的空闲线程这是一种浪费,极端情况下将影响系统的稳定性和工作效率;ThreadWorker将使用AutoResetEvent和事件通知的方式来代替在线程池中定期轮询检查的方式,每完成一个任务将重新开始空闲时间的计算,如果ThreadWorker在线程池中被取出,那么ThreadWorker空闲时间将永远不会到期,直到ThreadWorker被返回线程池后才重新开始空闲时间的计算

  状态图:

  Nutshell.ThreadWorkerPool .Net线程池设计

  关键代码:

  

         private void ThreadWorking()
{
while (_status != ThreadWorkerStatus.Abort)
{
//WaitOne 返回false表示等待超时,true接到取消等待的通知
//这里利用AutoResetEvent.WaitOne的特性来设计闲时控制,false表示空闲到期,true表示新的任务开始
if (!_waitEvent.WaitOne(_idleTime))
{
if (!_isCanIdleExpired) //_isCanIdleExpired变量控制是否允许超时,例如被取出后将不能超时
continue; _status = ThreadWorkerStatus.Abort;
_waitEvent.Close();
_waitEvent.Dispose();
if (OnIdleExpired != null)
OnIdleExpired(this, null); //空闲到期事件通知
return;
}
else if (_status == ThreadWorkerStatus.Abort)
return; try
{
Working();
}
catch (Exception ex)
{
_logger.TraceEvent(TraceEventType.Error, (int)TraceEventType.Error, ex.ToString());
}
finally
{
_status = ThreadWorkerStatus.Idle;
if (OnWorkCompleted != null)
OnWorkCompleted(this, null); //任务完成事件通知
}
}
}
      public void Work()
{
if (_status == ThreadWorkerStatus.Abort)
throw new InvalidOperationException("this ThreadWorker was Abort!"); if (_status == ThreadWorkerStatus.Working)
throw new InvalidOperationException("this ThreadWorker was working, unable to duplicate work!"); _status = ThreadWorkerStatus.Working;
_waitEvent.Set(); //通知线程有个新的工作要开始
}

  ThreadWorkerPoolItem

  要点设计:

  1. 链接ThreadWorker和线程池,线程池通过ThreadWorkerPoolItem控制ThreadWorker在线程池的取出,放回,销毁
  2. 通过订阅ThreadWorker的空闲到期事件OnIdleExpired,来完成线程池对线程的移除
  3. 通过订阅ThreadWorker的任务完成事件OnWorkCompleted,来完成线程返回线程池的操作
  4. 提供剩余空闲时间查询,来为线程池提供更优线程取出方案

  完整代码:

     public sealed class ThreadWorkerPoolItem
{
private ThreadWorkerPoolItemStatus _status;
private readonly ThreadWorkerBase _threadWorker;
private readonly ThreadWorkerPoolBase _threadWorkerPool;
private readonly int _idleTime;
private DateTime _startIdleTime; internal ThreadWorkerPoolItem(ThreadWorkerPoolBase pool, ThreadWorkerBase threadWorker, ThreadWorkerPoolSettings poolSettings)
{
_threadWorkerPool = pool;
_threadWorker = threadWorker;
_threadWorker.OnIdleExpired += _threadWorker_OnIdleExpired;
_threadWorker.OnWorkCompleted += _threadWorker_OnWorkCompleted;
_threadWorker.Start();
_status = ThreadWorkerPoolItemStatus.Idle;
_idleTime = poolSettings.IdleTime;
} void _threadWorker_OnWorkCompleted(object sender, EventArgs args)
{
_threadWorkerPool.Return(this);
} void _threadWorker_OnIdleExpired(object sender, EventArgs args)
{
_threadWorkerPool.Remove(this);
} internal ThreadWorkerPoolItemStatus Status
{
get
{
if (_threadWorker.Status == ThreadWorkerStatus.Abort || _status == ThreadWorkerPoolItemStatus.Abort)
return ThreadWorkerPoolItemStatus.Abort; return _status;
}
} internal int SurplusIdleTime
{
get
{
if (_status == ThreadWorkerPoolItemStatus.Take || _idleTime == -)
return -; int idledTime = (int)(_startIdleTime - DateTime.Now).TotalMilliseconds;
if (idledTime >= _idleTime)
return ; return idledTime;
}
} internal void SetTake()
{
_threadWorker.IsCanIdleExpried = false;
_status = ThreadWorkerPoolItemStatus.Take;
} internal void SetIdle()
{
_startIdleTime = DateTime.Now;
_status = ThreadWorkerPoolItemStatus.Idle;
_threadWorker.IsCanIdleExpried = true;
} internal ThreadWorkerBase ThreadWorker
{
get { return _threadWorker; }
}
}

  ThreadWorkerPool

  要点设计:

  1. 使用Lock配合ThreadWorkerPoolItem的状态来确保多线程下,每次取出的都是空闲的ThreadWorker
  2. 取出的超时设计,由于线程池有容量控制,高并发下必然导致线程池满负荷,提供超时设置,有利于使用者自行控制满负荷情况下的处理;ThreadWorkerPool将使用while+sleeping的方式,同时使用AutoResetEvent代替Thread.Sleep(timeout)来达到更佳的控制,当一个线程被放回线程池时,另一等待获取者立即获取,而无需等待下一次轮询的到来

  关键代码:

         protected bool TryTake(int timeout, out ThreadWorkerBase threadWorker)
{
threadWorker = null;
lock (_takeLocker)
{
ThreadWorkerPoolItem worker = null;
DateTime startWaitTime;
while (!_isDestoryed)
{
worker = _threadWorkerList.Where(e => e.Status == Core.ThreadWorkerPoolItemStatus.Idle).OrderByDescending(e => e.SurplusIdleTime).FirstOrDefault();
if (worker == null)
{
if (_threadWorkerList.Count < _settings.MaxThreadWorkerCount)
{
worker = this.CreatePoolItem(_threadWorkerList.Count + , _settings.IdleTime);
worker.SetTake();
_threadWorkerList.Add(worker);
threadWorker = worker.ThreadWorker;
return true;
} startWaitTime = DateTime.Now;
if (!_takeWaitEvent.WaitOne(timeout))
{
threadWorker = null;
return false;
} if (timeout != -)
{
timeout = timeout - (int)(DateTime.Now - startWaitTime).TotalMilliseconds;
if (timeout <= )
{
threadWorker = null;
return false;
}
}
continue;
} threadWorker = worker.ThreadWorker;
worker.SetTake();
return true;
} threadWorker = null;
return false;
}
}
         internal void Return(ThreadWorkerPoolItem item)
{
item.SetIdle();
_takeWaitEvent.Set();
}

  ThreadWorkerPoolManager使用单例模式管理,代码过于简单这里就不贴了......

  有兴趣的同学可以点击这里进行下载源码查看:Nutshell.ThreadWorkerPool.zip

  github 开源地址: https://github.com/zcylife/Nutshell