java 连接池的简单实现

时间:2023-03-09 02:22:51
java 连接池的简单实现

  最近一个项目中需要自己写个连接池, 写了一个下午,挺辛苦的,但不知道会不会出问题, 所以,贴到博客上,欢迎各路大神指点

1. 配置信息:

/**
*
*/
package cn.mjorcen.db.bean; import java.util.ResourceBundle; import org.apache.log4j.Logger; /**
*
* 配置信息
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 3:02:56 PM
* @version 1
*/
public class Configuration {
private ResourceBundle resource;
private Logger logger = Logger.getLogger(getClass());
private String driverClassName = "com.mysql.jdbc.Driver";
private String validationQuery = "SELECT 1";
private String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull";
private String user = "root";
private String password = "";
private int initialPoolSize = 3;
private int minPoolSize = 3;
private int maxPoolSize = 10;
private int maxStatements = 30;
private int maxIdleTime = 25000;
private int idleConnectionTestPeriod = 18000;
private int connectionLonger = 3600000; public Configuration() {
super(); } public Configuration(String _properties) {
super();
init(_properties);
} /**
*
* @param _properties
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 3:08:54 PM
* @version 1
*/
private void init(String _properties) {
resource = ResourceBundle.getBundle(_properties);
try {
String tmp = "";
setDriverClassName(resource.getString("driverClassName"));
setValidationQuery(resource.getString("validationQuery"));
setUrl(resource.getString("jdbc_url"));
setUser(resource.getString("jdbc_username"));
setPassword(resource.getString("jdbc_password")); tmp = resource.getString("initialPoolSize");
if (tmp != null) {
setInitialPoolSize(Integer.parseInt(tmp));
}
tmp = resource.getString("minPoolSize");
if (tmp != null) {
setMinPoolSize(Integer.parseInt(tmp));
}
tmp = resource.getString("maxPoolSize");
if (tmp != null) {
setMaxPoolSize(Integer.parseInt(tmp));
}
tmp = resource.getString("maxStatements");
if (tmp != null) {
setMaxStatements(Integer.parseInt(tmp));
}
tmp = resource.getString("maxIdleTime");
if (tmp != null) {
setMaxIdleTime(Integer.parseInt(tmp));
}
tmp = resource.getString("idleConnectionTestPeriod");
if (tmp != null) {
setIdleConnectionTestPeriod(Integer.parseInt(tmp));
}
tmp = resource.getString("connectionLonger");
if (tmp != null) {
setConnectionLonger(Integer.parseInt(tmp));
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
} } public ResourceBundle getResource() {
return resource;
} public void setResource(ResourceBundle resource) {
this.resource = resource;
} public String getDriverClassName() {
return driverClassName;
} public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
} public String getValidationQuery() {
return validationQuery;
} public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
} public String getUrl() {
return url;
} public void setUrl(String url) {
this.url = url;
} public String getUser() {
return user;
} public void setUser(String user) {
this.user = user;
} public String getPassword() {
return password;
} public void setPassword(String password) {
this.password = password;
} public int getInitialPoolSize() {
return initialPoolSize;
} public void setInitialPoolSize(int initialPoolSize) {
this.initialPoolSize = initialPoolSize;
} public int getMinPoolSize() {
return minPoolSize;
} public void setMinPoolSize(int minPoolSize) {
this.minPoolSize = minPoolSize;
} public int getMaxPoolSize() {
return maxPoolSize;
} public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
} public int getMaxStatements() {
return maxStatements;
} public void setMaxStatements(int maxStatements) {
this.maxStatements = maxStatements;
} public int getMaxIdleTime() {
return maxIdleTime;
} public void setMaxIdleTime(int maxIdleTime) {
this.maxIdleTime = maxIdleTime;
} public int getIdleConnectionTestPeriod() {
return idleConnectionTestPeriod;
} public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod) {
this.idleConnectionTestPeriod = idleConnectionTestPeriod;
} public int getConnectionLonger() {
return connectionLonger;
} public void setConnectionLonger(int connectionLonger) {
this.connectionLonger = connectionLonger;
} }

2. connection 的包装类, 因为mysql 一个连接连接8小时就会被mysql 干掉;所以出此下策;

/**
*
*/
package cn.mjorcen.db.bean; import java.sql.Connection;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; /**
*
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 4:27:30 PM
* @version 1
*/
public class WarpConnection {
private Logger logger = Logger.getLogger(getClass());
static private AtomicInteger atomicInteger = new AtomicInteger(0);
private String name;
private long connectionTime;
private long lastWorkTime;
private Connection connection; public long getConnectionTime() {
return connectionTime;
} public void setConnectionTime(long connectionTime) {
this.connectionTime = connectionTime;
} public Connection getConnection() {
return connection;
} public void setConnection(Connection connection) {
this.connection = connection;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public static WarpConnection warp(Connection connection) {
WarpConnection warpConnection = new WarpConnection();
warpConnection.setConnection(connection);
warpConnection.setConnectionTime(System.currentTimeMillis());
warpConnection.setName("name" + atomicInteger.getAndAdd(1));
return warpConnection;
} public boolean isTimeOut(long time) {
boolean flag = System.currentTimeMillis() - this.connectionTime >= time;
System.out.println("name is " + this.name + " ,connectionTime is "
+ connectionTime + ", flag is " + flag + " ,time is "+time);
return flag;
} public long getLastWorkTime() {
return lastWorkTime;
} public void setLastWorkTime(long lastWorkTime) {
this.lastWorkTime = lastWorkTime;
} @Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((connection == null) ? 0 : connection.hashCode());
return result;
} @Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
WarpConnection other = (WarpConnection) obj;
if (connection == null) {
if (other.connection != null)
return false;
} else if (!connection.equals(other.connection))
return false;
return true;
} /**
* 查看链接是否有效
*
* @param connectionLonger
* 连接最大时间
* @return
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 5:21:07 PM
* @version 1600000
* @throws SQLException
*/
public boolean veryfiConnection(int connectionLonger) {
try { if (this.connection == null || this.connection.isClosed()
|| isTimeOut(connectionLonger)) {
return true;
}
} catch (SQLException e) {
e.printStackTrace();
}
return false;
}
}

3.连接池:

/**
*
*/
package cn.mjorcen.db.pool; import java.sql.Connection;
import java.sql.SQLException; /**
*
* 数据源*别接口,定义了数据源的基本功能
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 3:20:21 PM
* @version 1
*/
public interface PooledDataSource {
/**
* 获取链接
*
* @return
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 3:23:03 PM
* @version 1
* @throws SQLException
*/
Connection getConnection() throws Exception; /**
* 销毁
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 3:26:00 PM
* @version 1
*/
void destroy() throws Exception; /**
* 释放
*
* @param connection
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 3:27:09 PM
* @version 1
*/
void release(Connection connection) throws Exception; /**
* 数据源释放可用
*
* @return
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 3:28:15 PM
* @version 1
*/
boolean isAvailable(); }

一个简单的实现类如下:

/**
*
*/
package cn.mjorcen.db.pool.impl; import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import cn.mjorcen.db.bean.Configuration;
import cn.mjorcen.db.bean.WarpConnection;
import cn.mjorcen.db.pool.PooledDataSource; /**
* 简单的线程池实现
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 3:24:32 PM
* @version 1
*/
public class AbstractPooledDataSource implements PooledDataSource { protected ConcurrentLinkedQueue<WarpConnection> idleQueue;
protected ConcurrentLinkedQueue<WarpConnection> busyQueue;
protected ThreadLocal<Connection> threadLocal;
protected AtomicInteger totalSize;
protected AtomicInteger currentSize;
protected boolean available;
protected Configuration configuration;
final Lock lock = new ReentrantLock();// 锁
// final Condition notFull = lock.newCondition(); // 实例化两个condition
final Condition notEmpty = lock.newCondition(); public AbstractPooledDataSource(Configuration configuration)
throws Exception {
super();
this.configuration = configuration;
idleQueue = new ConcurrentLinkedQueue<WarpConnection>();
busyQueue = new ConcurrentLinkedQueue<WarpConnection>();
threadLocal = new ThreadLocal<Connection>();
totalSize = new AtomicInteger(0);
currentSize = new AtomicInteger(0);
init();
} /**
*
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 3:49:36 PM
* @version 1
* @throws ClassNotFoundException
*/
private void init() throws Exception {
Class.forName("com.mysql.jdbc.Driver");
for (int i = 0; i < this.configuration.getInitialPoolSize(); i++) {
idleQueue.add(WarpConnection.warp(openConnection()));
}
this.totalSize.set(this.configuration.getInitialPoolSize());
available = true;
} protected Connection openConnection() throws SQLException {
return DriverManager.getConnection(configuration.getUrl(),
configuration.getUser(), configuration.getPassword());
} public Connection getConnection() throws SQLException {
Connection connection = threadLocal.get();
if (connection != null) {
return connection;
}
try {
lock.lock();
WarpConnection warpConnection = null;
try {
warpConnection = this.idleQueue.remove();
} catch (NoSuchElementException e) {
warpConnection = getWarpConnection();
}
veryfiConnection(warpConnection);
warpConnection.setLastWorkTime(System.currentTimeMillis());
this.busyQueue.add(warpConnection);
threadLocal.set(warpConnection.getConnection());
return warpConnection.getConnection();
} finally {
lock.unlock();
}
} /**
* 检查链接状态
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 5:17:06 PM
* @version 1
* @param warpConnection
* @throws SQLException
*/
private void veryfiConnection(WarpConnection warpConnection)
throws SQLException {
if (warpConnection.veryfiConnection(this.configuration
.getConnectionLonger())) {
warpConnection.setConnection(openConnection());
warpConnection.setConnectionTime(System.currentTimeMillis());
}
} /**
*
* @return
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 4:44:52 PM
* @version 1
* @throws SQLException
*/
private WarpConnection getWarpConnection() throws SQLException {
WarpConnection warpConnection = null; if (this.totalSize.get() < configuration.getMaxPoolSize()) {
warpConnection = WarpConnection.warp(openConnection());
this.totalSize.addAndGet(1);
return warpConnection;
}
while (true) {
try {
warpConnection = this.idleQueue.remove();
return warpConnection;
} catch (NoSuchElementException e) {
try {
this.notEmpty.wait();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
} } public void destroy() {
this.available = false;
ConcurrentLinkedQueue<WarpConnection> _idleQueue = this.idleQueue;
ConcurrentLinkedQueue<WarpConnection> _busyQueue = this.busyQueue;
this.idleQueue = null;
this.busyQueue = null;
this.threadLocal = null;
for (WarpConnection connection : _idleQueue) {
closeQuiet(connection.getConnection());
}
} private void closeQuiet(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
} public void release(Connection connection) throws Exception {
try {
lock.lock();
if (this.available) {
WarpConnection warpConnection = null;
for (WarpConnection element : this.busyQueue) {
if (element.getConnection().equals(connection)) {
warpConnection = element;
break;
}
}
this.busyQueue.remove(warpConnection);
this.idleQueue.add(warpConnection);
// System.out.println("busyQueue = " + busyQueue.size());
// System.out.println("idleQueue = " + idleQueue.size());
threadLocal.set(null);
notEmpty.signal();// 一旦插入就唤醒取数据线程
} else {
closeQuiet(connection);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} public boolean isAvailable() {
return available;
} }

调用类:

/**
*
*/
package cn.mjorcen.db.test; import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import cn.mjorcen.db.bean.Configuration;
import cn.mjorcen.db.pool.impl.AbstractPooledDataSource; /**
*
*
* @author mjorcen
* @email mjorcen@gmail.com
* @dateTime Oct 5, 2014 4:00:09 PM
* @version 1
*/
public class Client {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration("product_db");
final AbstractPooledDataSource dataSource = new AbstractPooledDataSource(
conf);
ExecutorService executor = Executors.newFixedThreadPool(10); Runnable r = new Runnable() {
public void run() {
try {
for (int i = 0; i < 3; i++) {
Connection connection = dataSource.getConnection();
System.out.println(Thread.currentThread().getName()
+ " : " + connection);
Thread.sleep(3000);
dataSource.release(connection);
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10; i++) { executor.execute(r);
}
// Connection connection = dataSource.getConnection();
// connection = dataSource.getConnection();
// System.out.println(connection);
// dataSource.release(connection);
}
}

配置文件:

driverClassName=com.mysql.jdbc.Driver
validationQuery=SELECT 1
jdbc_url=jdbc:mysql://115.29.36.149:3306/sai_zd?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
jdbc_username=c
jdbc_password=c
initialPoolSize=3
minPoolSize=3
maxPoolSize=10
maxStatements=30
maxIdleTime=25000
idleConnectionTestPeriod=18000
connectionLonger=3