池化 - Apache Commons Pool

时间:2022-12-07 17:06:12
  对于那些创建耗时较长,或者资源占用较多的对象,比如网络连接,线程之类的资源,通常使用池化来管理这些对象,从而达到提高性能的目的。比如数据库连接池(c3p0, dbcp), java的线程池 ExecutorService. Apache Commons Pool提供一套池化规范接口,以及实现通用逻辑,我们只需要要实现其抽象出来的方法就可以。Commons Pool主要有以下几个对象
  PooledObject:这个就是前面所说需要池化的资源,被池化的对象可以抽离出共有属性,如,创建时间,状态,最近一次使用时间等
  PooledObjectFactory: 对象工厂,负责对PooledOjbect的创建,状态验证,销毁之类的工作
  ObjectPool: 对象池,它是负责和对象使用者直接打交道的, 对使用者提供获取对象,返还对象接口
  英文不太好的同学可能被这几个对象的命名搞晕,其实世间万物的道理都是相通的。如果把图书馆的书比作PooledObject 的话,那么图书馆就是ObjectPool,图书馆为了管理书,对书添加了入库时间,书借存状态等用于管理的属性。图书馆(ObjectPool)对借书人提供借书,还书的服务(即接口)。而书(PooledObject )的印刷,质量检验,回收(有点不现实)等实实在在的工作还是得交给印刷厂(PooledObjectFactory )来做。其流程关系如下:
池化 - Apache Commons Pool
  下面看看如何使用Commons Pool如何实现自己的对象池。创建自己的对象池大致需要以下工作,
  1. 首先你已经编写好了你的资源对象(这部分不属于池化内容),之后编写实现apache的PooledObjectFactory<T>接口的Factory类,这是编写自己对象池最主要的工作。你的Factory可能需要添加一些用于池化对象的初始化 ,池化对象的验证等参数作为成员变量。
  2. 编写自己的Pool类,让其继承或者内部引用apache的GenericObjectPool<T>,GenericObjectPool实现了ObjectPool接口,已经封装了对池化对象的生命周期管理逻辑
  3. 可选部分,继承apache的GenericObjectPoolConfig,重写构造器,添加一些适合自己业务场景的初始化参数。
  我们以Jedis的源码为例,学习它的实现。我们先看下使用JedisPool操作Redis的简单例子
package com.eg.test.redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class TestPool {
public static void main(String[] args) {
//JedisPoolConfig继承apache的GenericObjectPoolConfig,配置Pool的相关参数如下:
JedisPoolConfig config = new JedisPoolConfig();
//如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
config.setMaxTotal(500);
//控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(5);
//表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(30000);;
//在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true); JedisPool pool = new JedisPool(config, "192.168.2.191", 8888);
//从pool中获取对象
Jedis jedis = pool.getResource();
String value = jedis.get("someKey"); }
}

  首先看JedisFactory的实现:

class JedisFactory implements PooledObjectFactory<Jedis> {
private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>();
private final int connectionTimeout;
private final int soTimeout; //省略构造函数,都是一些初始化成员变量的操作 @Override
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.getDB() != database) {
jedis.select(database);
}
} @Override
public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.isConnected()) {
try {
try {
jedis.quit();
} catch (Exception e) {
}
jedis.disconnect();
} catch (Exception e) {
}
}
} @Override
public PooledObject<Jedis> makeObject() throws Exception {
final HostAndPort hostAndPort = this.hostAndPort.get();
final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, ssl,
sslSocketFactory, sslParameters, hostnameVerifier);
try {
jedis.connect();
if (null != this.password) {
jedis.auth(this.password);
}
if (database != 0) {
jedis.select(database);
}
if (clientName != null) {
jedis.clientSetname(clientName);
}
} catch (JedisException je) {
jedis.close();
throw je;
}
return new DefaultPooledObject<Jedis>(jedis);
}
@Override
public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
// TODO maybe should select db 0? Not sure right now.
}
@Override
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
final BinaryJedis jedis = pooledJedis.getObject();
try {
HostAndPort hostAndPort = this.hostAndPort.get();
String connectionHost = jedis.getClient().getHost();
int connectionPort = jedis.getClient().getPort();
return hostAndPort.getHost().equals(connectionHost) && hostAndPort.getPort() == connectionPort
&& jedis.isConnected() && jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
}
}

  我们看到JedisFactory代码较少,但是逻辑很清晰。该Factory将作为ObjectPool的成员变量,其中四个重写的方法被ObjectPool管理对象生命周期的时候调用。makeobject()方法负责创建Jedis实例,成功调用connect()方法建立有状态的socket连接之后,返回一个包装了jedis的DefaultPooledObject对象,DefaultPooledObject实现了关于统计池化对象状态信息的PooledObject接口。validateObject()方法用于对对象状态的检验,Jedis对象的状态通过socket的ping-pong来验证连接是否正常。destroyObject()方法用来销毁对象,Jedis对象将会断开连接,回收资源。

  再看JedisPool的实现,由于JedisPool继承Pool<T>,所以我们主要看Pool<T>的部分代码:

public abstract class Pool<T> implements Closeable {
protected GenericObjectPool<T> internalPool; public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
initPool(poolConfig, factory);
} public boolean isClosed() {
return this.internalPool.isClosed();
}
public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
if (this.internalPool != null) {
try {
closeInternalPool();
} catch (Exception e) {
}
}
this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
}
public T getResource() {
try {
return internalPool.borrowObject();
} catch (NoSuchElementException nse) {
throw new JedisException("Could not get a resource from the pool", nse);
} catch (Exception e) {
throw new JedisConnectionException("Could not get a resource from the pool", e);
}
}
protected void returnResourceObject(final T resource) {
if (resource == null) {
return;
}
try {
internalPool.returnObject(resource);
} catch (Exception e) {
throw new JedisException("Could not return the resource to the pool", e);
}
} public void addObjects(int count) {
try {
for (int i = 0; i < count; i++) {
this.internalPool.addObject();
}
} catch (Exception e) {
throw new JedisException("Error trying to add idle objects", e);
}
}
}
  JedisPool通过内部引用GenericObjectPool,包装其接口的装饰者模式,相比继承来说这种模式更加灵活。JedisPool的构造方法需要将JedisFactory以及JedisPoolConfig创建标准的ObjectPool作为自己的成员变量。所以pool.getResource()方法的背后还是调用PoolObject.borrowObject()。
 
  最后我们稍微看下JedisPoolConfig,只是做了一些预初始化参数的工作。
public class JedisPoolConfig extends GenericObjectPoolConfig {
public JedisPoolConfig() {
// defaults to make your life with connection pool easier :)
setTestWhileIdle(true);
setMinEvictableIdleTimeMillis(60000);
setTimeBetweenEvictionRunsMillis(30000);
setNumTestsPerEvictionRun(-1);
}
}