spring AOP 实现事务和主从读写分离

时间:2022-09-09 23:40:13

spring AOP 实现事务和主从读写分离

1 切面 是个类

2 切入点

3 连接点

4 通知 是个方法

5 配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"> <!-- 引入属性文件 -->
<context:property-placeholder location="classpath:*.properties" /> <!-- 组件自动扫描 -->
<context:component-scan base-package="com.hengxin.qianee">
<context:exclude-filter type="regex" expression="com.hengxin.qianee.wechat.controller"/>
</context:component-scan> <bean id="masterDataSource" class="com.alibaba.druid.pool.DruidDataSource"
init-method="init" destroy-method="close">
<property name="url" value="${jdbc_url_master}" />
<property name="username" value="${jdbc_username_master}" />
<property name="password" value="${jdbc_password_master}" />
<!-- 初始化连接大小 -->
<property name="initialSize" value="0" />
<!-- 连接池最大使用连接数量 -->
<property name="maxActive" value="20" />
<!-- 连接池最大空闲 -->
<!-- <property name="maxIdle" value="20" /> -->
<!-- 连接池最小空闲 -->
<property name="minIdle" value="0" />
<!-- 获取连接最大等待时间 -->
<property name="maxWait" value="60000" />
<property name="validationQuery" value="${validationQuery}" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="testWhileIdle" value="true" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="25200000" />
<!-- 打开removeAbandoned功能 -->
<property name="removeAbandoned" value="true" />
<!-- 1800秒,也就是30分钟 -->
<property name="removeAbandonedTimeout" value="1800" />
<!-- 关闭abanded连接时输出错误日志 -->
<property name="logAbandoned" value="true" />
<!-- 监控数据库 -->
<!-- <property name="filters" value="stat" /> -->
<property name="filters" value="mergeStat" />
</bean> <bean id="slaveDataSource" class="com.alibaba.druid.pool.DruidDataSource"
init-method="init" destroy-method="close">
<property name="url" value="${jdbc_url_slave}" />
<property name="username" value="${jdbc_username_slave}" />
<property name="password" value="${jdbc_password_slave}" />
<!-- 初始化连接大小 -->
<property name="initialSize" value="0" />
<!-- 连接池最大使用连接数量 -->
<property name="maxActive" value="20" />
<!-- 连接池最大空闲 -->
<!-- <property name="maxIdle" value="20" /> -->
<!-- 连接池最小空闲 -->
<property name="minIdle" value="0" />
<!-- 获取连接最大等待时间 -->
<property name="maxWait" value="60000" />
<property name="validationQuery" value="${validationQuery}" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="testWhileIdle" value="true" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="25200000" />
<!-- 打开removeAbandoned功能 -->
<property name="removeAbandoned" value="true" />
<!-- 1800秒,也就是30分钟 -->
<property name="removeAbandonedTimeout" value="1800" />
<!-- 关闭abanded连接时输出错误日志 -->
<property name="logAbandoned" value="true" />
<!-- 监控数据库 -->
<!-- <property name="filters" value="stat" /> -->
<property name="filters" value="mergeStat" />
</bean> <bean id="readWriteDataSource" class="com.hengxin.qianee.util.ReadWriteDataSource">
<property name="writeDataSource" ref="masterDataSource"/>
<property name="readDataSourceMap">
<map>
<entry key="readDataSource1" value-ref="slaveDataSource"/>
<entry key="readDataSource2" value-ref="slaveDataSource"/>
<entry key="readDataSource3" value-ref="slaveDataSource"/>
<entry key="readDataSource4" value-ref="slaveDataSource"/>
</map>
</property>
</bean> <bean id="readWriteDataSourceTransactionProcessor" class="com.hengxin.qianee.util.ReadWriteDataSourceProcessor">
<property name="forceChoiceReadWhenWrite" value="false"/>
</bean>
<!-- myBatis文件 -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="readWriteDataSource" />
<!-- 自动扫描entity目录, 省掉Configuration.xml里的手工配置 -->
<property name="mapperLocations" value="classpath:com/hengxin/qianee/mapper/xml/*.xml" />
</bean> <!-- myBatis扫描文件 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.hengxin.qianee.mapper" />
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
</bean> <!-- 配置事务管理器 -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="readWriteDataSource" />
</bean> <!-- 拦截器方式配置事物 -->
<tx:advice id="transactionAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="add*" propagation="REQUIRED" />
<tx:method name="append*" propagation="REQUIRED" />
<tx:method name="insert*" propagation="REQUIRED" />
<tx:method name="save*" propagation="REQUIRED" />
<tx:method name="update*" propagation="REQUIRED" />
<tx:method name="modify*" propagation="REQUIRED" />
<tx:method name="edit*" propagation="REQUIRED" />
<tx:method name="delete*" propagation="REQUIRED" />
<tx:method name="remove*" propagation="REQUIRED" />
<tx:method name="repair" propagation="REQUIRED" />
<tx:method name="delAndRepair" propagation="REQUIRED" />
<tx:method name="load*" propagation="REQUIRED" />
<tx:method name="do*" propagation="REQUIRED" />
<tx:method name="send*" propagation="REQUIRED" />
<tx:method name="put*" read-only="true"/>
<tx:method name="query*" read-only="true"/>
<tx:method name="use*" read-only="true"/>
<tx:method name="get*" read-only="true" />
<tx:method name="count*" read-only="true" />
<tx:method name="find*" read-only="true" />
<tx:method name="list*" read-only="true" />
<tx:method name="select*" read-only="true" />
<tx:method name="is*" read-only="true" />
<tx:method name="*" propagation="REQUIRED" />
</tx:attributes>
</tx:advice>
<aop:config>
<!-- 切点 -->
<aop:pointcut id="transactionPointcut"
expression="(execution(* com.hengxin.qianee.service.impl.*.*(..)))
or (execution(* com.hengxin.qianee.wechat.service.impl.*.*(..)))" />
<!-- 建议 -->
<aop:advisor pointcut-ref="transactionPointcut"
advice-ref="transactionAdvice" />
<!-- 切面 -->
<aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor">
<!-- 环绕通知 -->
<aop:around pointcut-ref="transactionPointcut" method="determineReadOrWriteDB"/>
</aop:aspect>
</aop:config>
</beans>

6 serviceimpl 层 每个service方法是个切点 dao方法不是

package com.hengxin.qianee.service.impl;

/**
* 前台首页服务
* @author user
*
*/
@Service
public class FrontMainServiceImpl implements FrontMainService { @Autowired
private ContentAdvertisementsDao ContentAdvertisementsDao;//大广告Dao /**
* Ajax验证用户名是否已存在
*/
@Override
public ErrorInfo hasNameExist(String name, ErrorInfo errorInfo) { // 判断否空
if(StringUtils.isBlank(name)){
errorInfo.code = -1;
errorInfo.msg = "用户名不能为空"; return errorInfo;
} // 判断用户名是否可用
int rows = UserService.isNameExist(name);
if(rows>0){
errorInfo.code = -1;
errorInfo.msg = "该用户名已存在";
}else{
errorInfo.code = 1;
} return errorInfo;
} /**
* 推荐人是否存在
*/
@Override
public ErrorInfo isRecommendExist(String recommend, ErrorInfo errorInfo) { String recoName = ""; if(!StringUtils.isNotBlank(recommend)){
recoName = "";
errorInfo.code = 5;//当推荐人为空时 返回 "5"
return errorInfo;
}else{
//推荐人不为空时,判断邀请码有无此人
recoName = Encrypt.decrypt3DES(recommend, Constants.ENCRYPTION_KEY); //判断用户名是否可用
int rows = UserService.isNameExist(recoName);
if(rows<=0){
errorInfo.code = -1;
errorInfo.msg = "该推荐人不存在,请选填";
}else{
errorInfo.code = 4;
errorInfo.msg = "该推荐人存在";
return errorInfo;
} } return errorInfo;
} /**
* 注册页面发送短信
*/
@Override
public ErrorInfo verifyMobileRe(String mobile,ErrorInfo errorInfo) { // 校验非空
if (StringUtils.isBlank(mobile)) {
errorInfo.code = -1;
errorInfo.msg = "请输入手机号码"; return errorInfo;
} // 校验格式
if (!RegexUtils.isMobileNum(mobile)) {
errorInfo.code = -1;
errorInfo.msg = "请输入正确的手机号码"; return errorInfo;
} Users user = new Users();
boolean flag = false; if (user == null || StringUtils.isBlank(user.getMobile()) || !user.getMobile().equals(mobile)) {
flag = UserService.isMobileExistFlag(mobile);
} if(!flag){
//发短信
smsService.sendCode(mobile, errorInfo);
}else{
errorInfo.code = -1;
errorInfo.msg = "该手机号码已存在";
} return errorInfo;
} /**
* 前台注册用户
*/
@Override
public ErrorInfo addregisterUser(Users user,String path,String contextPath,ErrorInfo errorInfo,String recoName) {
errorInfo.clear(); BackstageSet backstageSet = (BackstageSet) cache.getObject("backstageSet");
user.setCreditLine(backstageSet.getInitialAmount());
user.setLastCreditLine(backstageSet.getInitialAmount()); // 获取注册关键否定词(如:xi*)
String keyWord = backstageSet.getKeywords(); if(StringUtils.isNotBlank(keyWord)){
String [] keywords = keyWord.split(","); for(String word : keywords) {
if(user.getName().contains(word)) {
errorInfo.code = -1;
errorInfo.msg = "对不起,注册的用户名包含敏感词汇,请重新输入用户名"; return errorInfo;
}
}
} if(!recoName.equals("")){
// 根据用户在前台的推荐码解密成推荐人用户名查Id
long recommendedId = userDao.queryIdByUserName(recoName);
if(recommendedId>0){
user.setRecommendUserId(recommendedId);
user.setRecommendRewardType(backstageSet.getCpsRewardType());
user.setRecommendTime(new Date());
}else{
user.setRecommendUserId(0L);
user.setRecommendRewardType(-1);
user.setRecommendTime(null);
}
}else{
// 没有推荐人,推荐人id为0(非空)
user.setRecommendUserId(0L);
} String uuid = UUID.randomUUID().toString(); try {
Qrcode.create(contextPath + "/loginAndRegister/register?un=" + Encrypt.encrypt3DES(user.getName(), Constants.ENCRYPTION_KEY),
BarcodeFormat.QR_CODE,
100, 100,
new File(path,uuid+".png").getAbsolutePath(), "png");
// 读取本地文件
String fileName = uuid.split("\\.")[0]+".png";
File file = new File(path, fileName); // 用户总数
int userCount = UserService.selectUserCount();
// 准备上传至服务器
Map<String, Object> map = fileUploadService.registeredUploadFiles(file, Constants.FileFormat.IMG, userCount, errorInfo );
// 取完之后删除文件
file.delete();
System.out.println(((String) map.get("fileName")).split("%")[1]);
// 截取时间后面一节
user.setQrCode(((String) map.get("fileName")).split("%")[1].split("\\.")[0]);
// 是否禁止登录(false:可以登录)
user.setIsAllowLogin(false);
} catch (WriterException e) {
e.printStackTrace();
System.err.println("生成二维码图像失败!");
} catch (IOException e) {
e.printStackTrace();
System.err.println("生成二维码图像失败!");
} // 注册成功添加用户 每个service方法是个切点 dao方法不是
int rows = userDao.insertUser(user); if(rows<=0){
errorInfo.code = -1;
errorInfo.msg = "此次注册失败!";
return errorInfo;
} MD5 md5 = new MD5();
String sign1 = md5.getMD5ofStr(""+user.getId()+0.00+0.00+Constants.ENCRYPTION_KEY);
String sign2 = md5.getMD5ofStr(""+user.getId()+0.00+0.00+0.00+0.00+Constants.ENCRYPTION_KEY); int updateSign = userDao.updateSign(sign1, sign2, user.getId()); if(updateSign<=0){
errorInfo.code = -1;
errorInfo.msg = "此次注册失败!";
return errorInfo;
} userEventsDao.inserUserEvent(user.getId(), UserEvent.REGISTER, "注册成功", errorInfo); if(errorInfo.code < 0){
return errorInfo;
} // 发送注册站内信
addSendLetter(user, Constants.M_REGISTER,errorInfo); //创建审计项目
statisticUserAuditItemsDao.createAuditItem(user.getId()); errorInfo.code = 0;
errorInfo.msg = "恭喜你,注册成功!"; return errorInfo;
} /**
* 发送站内信
*/
@Override
public ErrorInfo addSendLetter(Users user,long id,ErrorInfo error) { // 获取发送内容和标题
MessageStationTemplates mst = messageStationTemplatesDao.fandMessageStationTemplates(id); // 开启状态(默认 true:开启 false:关闭)
if(mst.getStatus()){
// 添加消息的任务(定时发送)
int rows = messageSendingDao.addMessageTask(user.getId(), mst.getTitle(), mst.getContent());
if(rows<=0){
error.code = -1;
error.msg = "添加失败";
}
}
return error;
} /**
* 忘记密码页面发送短信
*/
@Override
public ErrorInfo sendMobileMessage(String mobile, ErrorInfo errorInfo) { // 校验非空
if (StringUtils.isBlank(mobile)) {
errorInfo.code = -1;
errorInfo.msg = "请输入手机号码"; return errorInfo;
} // 校验格式
if (!RegexUtils.isMobileNum(mobile)) {
errorInfo.code = -1;
errorInfo.msg = "请输入正确的手机号码"; return errorInfo;
} //发短信
smsService.sendCode(mobile, errorInfo); return errorInfo;
} /**
* 查询用户注册协议
*/
@Override
public String queryContent(long id) {
return ContentNewsDao.queryContent(id);
} }

7 切面

package com.hengxin.qianee.util;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.NestedRuntimeException;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.ReflectionUtils; /**
*
*
* <pre>
*
* 此类实现了两个职责(为了减少类的数量将两个功能合并到一起了):
* 读/写动态数据库选择处理器
* 通过AOP切面实现读/写选择
*
*
* ★★读/写动态数据库选择处理器★★
* 1、首先读取<tx:advice>事务属性配置
*
* 2、对于所有读方法设置 read-only="true" 表示读取操作(以此来判断是选择读还是写库),其他操作都是走写库
* 如<tx:method name="×××" read-only="true"/>
*
* 3、 forceChoiceReadOnWrite用于确定在如果目前是写(即开启了事务),下一步如果是读,
* 是直接参与到写库进行读,还是强制从读库读<br/>
* forceChoiceReadOnWrite:false 表示目前是写,下一步如果是读,强制参与到写事务(即从写库读)
* 这样可以避免写的时候从读库读不到数据
*
* 通过设置事务传播行为:SUPPORTS实现
*
* forceChoiceReadOnWrite:true 表示不管当前事务是写/读,都强制从读库获取数据
* 通过设置事务传播行为:NOT_SUPPORTS实现(连接是尽快释放)
* 『此处借助了 NOT_SUPPORTS会挂起之前的事务进行操作 然后再恢复之前事务完成的』
* 4、配置方式
* <bean id="readWriteDataSourceTransactionProcessor" class="cn.javass.common.datasource.ReadWriteDataSourceProcessor">
* <property name="forceChoiceReadWhenWrite" value="false"/>
* </bean>
*
* 5、目前只适用于<tx:advice>情况 TODO 支持@Transactional注解事务
*
*
*
* ★★通过AOP切面实现读/写库选择★★
*
* 1、首先将当前方法 与 根据之前【读/写动态数据库选择处理器】 提取的读库方法 进行匹配
*
* 2、如果匹配,说明是读取数据:
* 2.1、如果forceChoiceReadOnWrite:true,即强制走读库
* 2.2、如果之前是写操作且forceChoiceReadOnWrite:false,将从写库进行读取
* 2.3、否则,到读库进行读取数据
*
* 3、如果不匹配,说明默认将使用写库进行操作
*
* 4、配置方式
* <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor">
* <aop:around pointcut-ref="txPointcut" method="determineReadOrWriteDB"/>
* </aop:aspect>
* 4.1、此处order = Integer.MIN_VALUE 即最高的优先级(请参考http://jinnianshilongnian.iteye.com/blog/1423489)
* 4.2、切入点:txPointcut 和 实施事务的切入点一样
* 4.3、determineReadOrWriteDB方法用于决策是走读/写库的,请参考
* @see cn.javass.common.datasource.ReadWriteDataSourceDecision
* @see cn.javass.common.datasource.ReadWriteDataSource
*
* </pre>
* @author Zhang Kaitao
*
*/
public class ReadWriteDataSourceProcessor implements BeanPostProcessor {
// private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSourceProcessor.class); private boolean forceChoiceReadWhenWrite = false; private Map<String, Boolean> readMethodMap = new HashMap<String, Boolean>(); /**
* 当之前操作是写的时候,是否强制从从库读
* 默认(false) 当之前操作是写,默认强制从写库读
* @param forceReadOnWrite
*/ public void setForceChoiceReadWhenWrite(boolean forceChoiceReadWhenWrite) { this.forceChoiceReadWhenWrite = forceChoiceReadWhenWrite;
} @Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if(!(bean instanceof NameMatchTransactionAttributeSource)) {
return bean;
} try {
NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource)bean;
Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap");
nameMapField.setAccessible(true);
Map<String, TransactionAttribute> nameMap = (Map<String, TransactionAttribute>) nameMapField.get(transactionAttributeSource); for(Entry<String, TransactionAttribute> entry : nameMap.entrySet()) {
RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute)entry.getValue(); //仅对read-only的处理
if(!attr.isReadOnly()) {
continue;
} String methodName = entry.getKey();
Boolean isForceChoiceRead = Boolean.FALSE;
if(forceChoiceReadWhenWrite) {
//不管之前操作是写,默认强制从读库读 (设置为NOT_SUPPORTED即可)
//NOT_SUPPORTED会挂起之前的事务
attr.setPropagationBehavior(Propagation.NOT_SUPPORTED.value());
isForceChoiceRead = Boolean.TRUE;
} else {
//否则 设置为SUPPORTS(这样可以参与到写事务)
attr.setPropagationBehavior(Propagation.SUPPORTS.value());
}
System.out.println("read/write transaction process method:{} force read:{}"+" "+ methodName+" "+ isForceChoiceRead);
readMethodMap.put(methodName, isForceChoiceRead);
} } catch (Exception e) {
throw new ReadWriteDataSourceTransactionException("process read/write transaction error", e);
} return bean;
} @Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
} private class ReadWriteDataSourceTransactionException extends NestedRuntimeException {
public ReadWriteDataSourceTransactionException(String message, Throwable cause) {
super(message, cause);
}
}

//ProceedingJoinPoint 连接点
 
public Object determineReadOrWriteDB(ProceedingJoinPoint pjp) throws Throwable {
if (isChoiceReadDB(pjp.getSignature().getName())) {
ReadWriteDataSourceDecision.markRead();
System.out.println("方法:" + pjp.getSignature().getName() +"进入读库!");
} else {
ReadWriteDataSourceDecision.markWrite();
System.out.println("方法:" + pjp.getSignature().getName() +"进入写库!");
} try {
return pjp.proceed(); } finally {
System.out.println(pjp.getSignature().getName()+" "+"reset方法");
ReadWriteDataSourceDecision.reset();
} } private boolean isChoiceReadDB(String methodName) { String bestNameMatch = null;
for (String mappedName : this.readMethodMap.keySet()) {
if (isMatch(methodName, mappedName)) {
bestNameMatch = mappedName;
break;
}
} Boolean isForceChoiceRead = readMethodMap.get(bestNameMatch);
//表示强制选择 读 库
if(isForceChoiceRead == Boolean.TRUE) {
System.out.println("表示强制选择 读 库");
return true;
} //如果之前选择了写库 现在还选择 写库
if(ReadWriteDataSourceDecision.isChoiceWrite()) {
System.out.println("如果之前选择了写库 现在还选择 写库");
return false;
} //表示应该选择读库
if(isForceChoiceRead != null) {
System.out.println("表示应该选择读库");
return true;
}
//默认选择 写库
return false;
} protected boolean isMatch(String methodName, String mappedName) {
return PatternMatchUtils.simpleMatch(mappedName, methodName);
} }
package com.hengxin.qianee.util;

/**
* <pre>
* 读/写动态数据库 决策者
* 根据DataSourceType是write/read 来决定是使用读/写数据库
* 通过ThreadLocal绑定实现选择功能
* </pre>
* @author Zhang Kaitao
*
*/
public class ReadWriteDataSourceDecision { public enum DataSourceType {
write, read;
} private static final ThreadLocal<DataSourceType> holder = new ThreadLocal<DataSourceType>(); public static void markWrite() {
holder.set(DataSourceType.write);
} public static void markRead() {
holder.set(DataSourceType.read);
} public static void reset() {
holder.set(null);
} public static boolean isChoiceNone() {
return null == holder.get();
} public static boolean isChoiceWrite() {
return DataSourceType.write == holder.get();
} public static boolean isChoiceRead() {
return DataSourceType.read == holder.get();
} }
package com.hengxin.qianee.util;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger; import javax.sql.DataSource; //import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.AbstractDataSource;
import org.springframework.util.CollectionUtils; /**
*
* <pre>
* 读/写动态选择数据库实现
* 目前实现功能
* 一写库多读库选择功能,请参考
* @see cn.javass.common.datasource.ReadWriteDataSourceDecision
@see cn.javass.common.datasource.ReadWriteDataSourceDecision.DataSourceType
*
* 默认按顺序轮询使用读库
* 默认选择写库
*
* 已实现:一写多读、当写时默认读操作到写库、当写时强制读操作到读库
* TODO 读库负载均衡、读库故障转移
* </pre>
* @author Zhang Kaitao
*
*/
public class ReadWriteDataSource extends AbstractDataSource implements InitializingBean {
// private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSource.class); private DataSource writeDataSource;
private Map<String, DataSource> readDataSourceMap; private String[] readDataSourceNames;
private DataSource[] readDataSources;
private int readDataSourceCount; private AtomicInteger counter = new AtomicInteger(1); /**
* 设置读库(name, DataSource)
* @param readDataSourceMap
*/
public void setReadDataSourceMap(Map<String, DataSource> readDataSourceMap) {
this.readDataSourceMap = readDataSourceMap;
}
public void setWriteDataSource(DataSource writeDataSource) {
this.writeDataSource = writeDataSource;
} @Override
public void afterPropertiesSet() throws Exception {
if(writeDataSource == null) {
throw new IllegalArgumentException("property 'writeDataSource' is required");
}
if(CollectionUtils.isEmpty(readDataSourceMap)) {
throw new IllegalArgumentException("property 'readDataSourceMap' is required");
}
readDataSourceCount = readDataSourceMap.size(); readDataSources = new DataSource[readDataSourceCount];
readDataSourceNames = new String[readDataSourceCount]; int i = 0;
for(Entry<String, DataSource> e : readDataSourceMap.entrySet()) {
readDataSources[i] = e.getValue();
readDataSourceNames[i] = e.getKey();
i++;
} } private DataSource determineDataSource() {
if(ReadWriteDataSourceDecision.isChoiceWrite()) {
System.out.println("current determine write datasource");
return writeDataSource;
} if(ReadWriteDataSourceDecision.isChoiceNone()) {
System.out.println("no choice read/write, default determine write datasource");
return writeDataSource;
}
return determineReadDataSource();
} private DataSource determineReadDataSource() {
//按照顺序选择读库
//TODO 算法改进
int index = counter.incrementAndGet() % readDataSourceCount;
if(index < 0) {
index = - index;
} String dataSourceName = readDataSourceNames[index]; System.out.println("current determine read datasource : {}"+" "+dataSourceName); return readDataSources[index];
} @Override
public Connection getConnection() throws SQLException {
return determineDataSource().getConnection();
} @Override
public Connection getConnection(String username, String password) throws SQLException {
return determineDataSource().getConnection(username, password);
} }