通用对象池ObjectPool的一种简易设计和实现方案

时间:2023-03-09 02:22:47
通用对象池ObjectPool的一种简易设计和实现方案

对象池,最简单直接的作用当然是通过池来减少创建和销毁对象次数,实现对象的缓存和复用。我们熟知的线程池、数据库连接池、TCP连接池等等都是非常典型的对象池。

一个基本的简易对象池的主要功能实现我认为应该至少包括以下三点:

1、对象的分配、销毁等管理策略

2、线程安全

3、性能

按照主要的常用功能,我们大致可以抽象出以下泛型接口IObjectPool<T>:

通用对象池ObjectPool的一种简易设计和实现方案通用对象池ObjectPool的一种简易设计和实现方案
IObjectPoolusing System;

/// <summary>
/// Simple ObjectPool Interface
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IObjectPool<T> : IDisposable
{
#region Properties /// <summary>
/// Minimum enabled object count
/// </summary>
int MinObjCount { get; } /// <summary>
/// Maximum enabled object count
/// </summary>
int MaxObjCount { get; } /// <summary>
/// Current enabled object count
/// </summary>
int CurrentObjCount { get; } #endregion #region Methods /// <summary>
/// Gets an item from the pool.
/// </summary>
/// <returns>The removed or created item.</returns>
/// <remarks>If the pool is empty, a new item will be created and returned.</remarks>
T GetObject(); /// <summary>
/// Adds the provided item into the pool.
/// </summary>
/// <param name="item">The item to be added.</param>
void PutObject(T item); #endregion }

具体实现这个接口的时候,考虑到线程安全,同时为了降低线程安全控制的种种风险,我们使用framework4.0中提供的线程安全容器,示例代码中选择非常适合这种场景的线程安全队列ConcurrentQueue(时间和空间复杂度都非常合适):

通用对象池ObjectPool的一种简易设计和实现方案通用对象池ObjectPool的一种简易设计和实现方案
ObjectPoolusing System;
using System.Collections.Concurrent; public class ObjectPool<T> : IObjectPool<T> where T : new()
{
#region 字段 private readonly IProducerConsumerCollection<T> collection; private int _minObjCount = 0; private int _maxObjCount = 0; #endregion #region 构造函数 public ObjectPool()
: this(1, 8)
{
} public ObjectPool(int minObjCount, int maxObjCount)
{
if (minObjCount < 1)
{
throw new ArgumentException("minObjCount cannot be less than zero");
}
if (maxObjCount < 1)
{
throw new ArgumentException("maxObjCount cannot be less than zero");
}
if (maxObjCount < minObjCount)
{
throw new ArgumentException("maxObjCount cannot be less than minObjCount");
} this._minObjCount = minObjCount;
this._maxObjCount = maxObjCount;
collection = new ConcurrentQueue<T>();
var objCount = (minObjCount + maxObjCount) / 2;
for (int i = 0; i < objCount; i++) //初始化的对象个数取折中方案
{
var item = new T();
collection.TryAdd(item);
}
} #endregion #region 属性 public int MinObjCount { get { return _minObjCount; } } public int MaxObjCount { get { return _maxObjCount; } } public int CurrentObjCount
{
get
{
if (collection != null)
{
return collection.Count;
}
return 0;
}
} #endregion #region 方法 public T GetObject()
{
T item = default(T);
var isOK = collection.TryTake(out item); //如取出对象为空 此处采用悲观对象分配方案,直接创建一个新对象,保证有对象可用
if (item == null || isOK == false)
{
item = new T();
this.PutObject(item); //新创建的对象放入容器中
Console.WriteLine("新创建的对象放入容器中");
}
return item;
} public void PutObject(T item)
{
if (item == null)
{
return;
}
collection.TryAdd(item);
//TryTrimToMax(); //清除大于队列最大可用的项
} public void Dispose()
{
} #endregion #region 辅助方法 /// <summary>
/// 清除大于队列最大可用的项
/// </summary>
private void TryTrimToMax()
{
while (this.collection.Count > this.MaxObjCount)
{
var item = default(T);
this.collection.TryTake(out item);
Console.WriteLine("清除了大于队列最大可用的项");
}
} #endregion }

其中,GetObject方法即对象的获取至少有两种策略:

1、悲观对象创建策略
这种策略在获取对象时,“悲观”地认为当对象池中暂无可用对象时,等待是会发生很久时间的,必须保证能够返回一个可用对象,所以不如去掉等待直接创建一个对象返回。虽然并发较高的场景下这有可能造成系统中对象总数暂时超出了最大对象数量限制,但好处是我们可以保证系统有足够的对象可用,不会因为没可用对象或等待创建可用对象而使上层逻辑受阻。
对于对象超过最大对象个数限制的情况,我们完全可以对外再暴露一个接口方法(示例为TryTrimToMax),当系统并发压力减轻时,可调用该方法清理多余的对象。
上面ObjectPool示例代码中我们采取的就是悲观创建对象的方案。

2、乐观对象创建策略

这种策略“乐观”地认为,当对象池中暂无可用对象时,所有对象的消费者会在用完对象后及时的返回对象池中。
如果对象池中无可用对象,那么当前请求者可能会选择Sleep或者Wait或WaitOne等方式等待。如果更乐观的话,还可以采用类似lock-free的方式多尝试去获取对象(不完全是CAS的那种lock-free),直到获取对象为止。
这种方式的优点显而易见,它的空间使用率高,不会造成对象超过最大上限。但它的缺点也很明显,在一个高并发的可用对象已经供不应求的环境下,这种策略多数实现都不是wait-free的,没有获取到对象的请求调用只能排队等待,这就极大地降低了系统吞吐量,原来的对象池是为了提高性能的,现在则成了系统性能瓶颈的重要原因。

上述示例代码中,泛型参数T有where限制,可通过Func委托间接去掉这个限制,当然构造函数会略微变得复杂:

通用对象池ObjectPool的一种简易设计和实现方案通用对象池ObjectPool的一种简易设计和实现方案
ObjectPoolusing System;
using System.Collections.Concurrent; public class ObjectPool<T> : IObjectPool<T>
{
#region 字段 private readonly IProducerConsumerCollection<T> collection; private readonly Func<T> _generator; private int _minObjCount = 0; private int _maxObjCount = 0; #endregion #region 构造函数 public ObjectPool(Func<T> generator)
: this(generator, 1, 8)
{
} public ObjectPool(Func<T> generator, int minObjCount, int maxObjCount)
{
if (generator == null)
{
throw new ArgumentNullException("generator cannot be null");
}
if (minObjCount < 1)
{
throw new ArgumentException("minObjCount cannot be less than zero");
}
if (maxObjCount < 1)
{
throw new ArgumentException("maxObjCount cannot be less than zero");
}
if (maxObjCount < minObjCount)
{
throw new ArgumentException("maxObjCount cannot be less than minObjCount");
}
this._generator = generator;
this._minObjCount = minObjCount;
this._maxObjCount = maxObjCount;
collection = new ConcurrentQueue<T>();
var objCount = (minObjCount + maxObjCount) / 2;
for (int i = 0; i < objCount; i++) //初始化的对象个数取折中方案
{
var item = _generator();
collection.TryAdd(item);
}
} #endregion #region 属性 public int MinObjCount { get { return _minObjCount; } } public int MaxObjCount { get { return _maxObjCount; } } public int CurrentObjCount
{
get
{
if (collection != null)
{
return collection.Count;
}
return 0;
}
} #endregion #region 方法 public T GetObject()
{
T item = default(T);
var isOK = collection.TryTake(out item); //如取出对象为空 此处采用悲观对象分配方案,直接创建一个新对象,保证有对象可用
if (item == null || isOK == false)
{
item = _generator();
this.PutObject(item); //新创建的对象放入容器中
Console.WriteLine("新创建的对象放入容器中");
}
return item;
} public void PutObject(T item)
{
if (item == null)
{
return;
}
collection.TryAdd(item);
//TryTrimToMax(); //清除大于队列最大可用的项
} public void Dispose()
{
} #endregion #region 辅助方法 /// <summary>
/// 清除大于队列最大可用的项
/// </summary>
private void TryTrimToMax()
{
while (this.collection.Count > this.MaxObjCount)
{
var item = default(T);
this.collection.TryTake(out item);
Console.WriteLine("清除了大于队列最大可用的项");
}
} #endregion }

可通过下列代码进行测试验证:

通用对象池ObjectPool的一种简易设计和实现方案通用对象池ObjectPool的一种简易设计和实现方案
ObjectPoolTestusing System;
using System.Threading;
using System.Threading.Tasks; class UserService
{
public void CheckUser(int i)
{
//Console.WriteLine("check user:{0}", i);
Thread.Sleep(1);
}
} public class Program
{
static void Main(string[] args)
{
TestObjectPool(); for (int a = 0; a < 10; a++)
{
Console.WriteLine("测试:{0}", a + 1);
TestObjectPool();
} Console.Read();
} static void TestObjectPool()
{
var recordCount = 10000;
using (var pool = new ObjectPool<UserService>(() => { return new UserService(); }, 1, 16))
//using (var pool = new ObjectPool<UserService>(() => { return new UserService(); }, 1, 128))
{
Console.WriteLine("Init:current obj count:{0}", pool.CurrentObjCount);
Parallel.For(0, recordCount, i =>
{
var service = pool.GetObject();
service.CheckUser(i);
Thread.Sleep(2);
pool.PutObject(service);
}); Console.WriteLine("current obj count:{0}", pool.CurrentObjCount);
Console.WriteLine("min obj count:{0}", pool.MinObjCount);
Console.WriteLine("max obj count:{0}", pool.MaxObjCount);
Console.WriteLine("====================");
}
}
}

本文的对象池实现较为简洁,只有基本的存取功能,其他功能如对象的销毁等可以继续扩展。还要感慨下幸好有了线程安全容器,果然可以成倍地提高生产效率,而且代码更加简洁优雅。

参考:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/13/9990427.aspx

http://code.msdn.microsoft.com/parextsamples