MyBatis框架的使用及源码分析(九) Executor

时间:2022-10-28 06:44:55

从<MyBatis框架的使用及源码分析(八) MapperMethod>文中我们知道执行Mapper的每一个接口方法,最后调用的是MapperMethod.execute方法。而当执行MapperMethod的execute方法的时候,根据当前MapperMethod对应的mapper配置会执行Session的insert, update, delete, select, selectList, selectMap, selectCursor, selectOne或flushStatements方法。

我们来看DefaultSqlSession一个具体的实现方法:

  public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
try {
MappedStatement ms = configuration.getMappedStatement(statement);
List<E> result = executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
return result;
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}

我们看到,真正执行的代码主体在Executor对象里面。

DefaultSqlSession类的成员executor是在构造函数里面给他赋值的。所以我们又要回头去查看一下是在什么时候实例化了DefaultSqlSession类。

DefaultSqlSession在SqlSessionFactory的实现类DefaultSqlSessionFactory中被创建:

private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
} private SqlSession openSessionFromConnection(ExecutorType execType, Connection connection) {
try {
boolean autoCommit;
try {
autoCommit = connection.getAutoCommit();
} catch (SQLException e) {
// Failover to true, as most poor drivers
// or databases won't support transactions
autoCommit = true;
}
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
final Transaction tx = transactionFactory.newTransaction(connection);
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}

从源码中我们可以看到他是通过Configuration类的newExecutor方法来得到的,代码如下:

public Executor newExecutor(Transaction transaction, ExecutorType executorType) {  
//根据executorType来选择实现子类 
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}

Mybatis对外统一提供了一个操作接口类Executor,提供的接口方法有update、query、flushStatements、commit、rollback等接口函数,源码如下:

package org.apache.ibatis.executor;

import java.sql.SQLException;
import java.util.List; import org.apache.ibatis.cache.CacheKey;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.transaction.Transaction; /**
* @author Clinton Begin
*/
public interface Executor { ResultHandler NO_RESULT_HANDLER = null; int update(MappedStatement ms, Object parameter) throws SQLException; <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException; <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException; List<BatchResult> flushStatements() throws SQLException;
//事物提交
void commit(boolean required) throws SQLException;
//事物回滚
void rollback(boolean required) throws SQLException;
//创建CacheKey
CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
//是否开启缓存
boolean isCached(MappedStatement ms, CacheKey key);
//清除本地缓存
void clearLocalCache();
//延迟加载
void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
//获取事物管理器
Transaction getTransaction();
//关闭连接
void close(boolean forceRollback);
//Executor是否关闭
boolean isClosed(); void setExecutorWrapper(Executor executor); }

具体实现类有抽象类BaseExecutor、实现类CachingExecutor、实现类BatchExecutor、实现类ReuseExecutor和实现类SimpleExecutor。

具体选用哪个子类SimpleExecutor、ReuseExecutor和BatchExecutor实现,可以在Mybatis的配置文件中进行配,配置如下:

<settings>
<setting name="defaultExecutorType" value="REUSE"/> <!--SIMPLE、REUSE、BATCH-->
</settings>

配置之后在Configuration类中的newExecutor()函数会选择具体使用的子类。

抽象类BaseExecutor的实现源码及解析如下如下:

package org.apache.ibatis.executor;

import static org.apache.ibatis.executor.ExecutionPlaceholder.EXECUTION_PLACEHOLDER;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.ibatis.cache.CacheKey;
import org.apache.ibatis.cache.impl.PerpetualCache;
import org.apache.ibatis.logging.Log;
import org.apache.ibatis.logging.LogFactory;
import org.apache.ibatis.logging.jdbc.ConnectionLogger;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.ParameterMapping;
import org.apache.ibatis.mapping.ParameterMode;
import org.apache.ibatis.mapping.StatementType;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.factory.ObjectFactory;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.LocalCacheScope;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.transaction.Transaction;
import org.apache.ibatis.type.TypeHandlerRegistry; /**
* @author Clinton Begin
*/
public abstract class BaseExecutor implements Executor { private static final Log log = LogFactory.getLog(BaseExecutor.class);
//事务
protected Transaction transaction;
//执行器的包装对象
protected Executor wrapper;
//线程安全队列
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
//本地缓存
protected PerpetualCache localCache;
protected PerpetualCache localOutputParameterCache;
//mybatis的配置信息
protected Configuration configuration;
//查询堆栈
protected int queryStack = 0;
private boolean closed; protected BaseExecutor(Configuration configuration, Transaction transaction) {
this.transaction = transaction;
this.deferredLoads = new ConcurrentLinkedQueue<DeferredLoad>();
this.localCache = new PerpetualCache("LocalCache");
this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
this.closed = false;
this.configuration = configuration;
this.wrapper = this;
} public Transaction getTransaction() {
if (closed) throw new ExecutorException("Executor was closed.");
return transaction;
} public void close(boolean forceRollback) {
try {
try {
rollback(forceRollback);
} finally {
if (transaction != null) transaction.close();
}
} catch (SQLException e) {
// Ignore. There's nothing that can be done at this point.
log.warn("Unexpected exception on closing transaction. Cause: " + e);
} finally {
transaction = null;
deferredLoads = null;
localCache = null;
localOutputParameterCache = null;
closed = true;
}
} public boolean isClosed() {
return closed;
}

 //SqlSession的update/insert/delete会调用此方法
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) throw new ExecutorException("Executor was closed.");
//先清局部缓存,再更新,如何更新由子类实现,模板方法模式
clearLocalCache();
return doUpdate(ms, parameter);
} public List<BatchResult> flushStatements() throws SQLException {
return flushStatements(false);
} public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
if (closed) throw new ExecutorException("Executor was closed.");
return doFlushStatements(isRollBack);
}

//SqlSession.selectList会调用此方法 
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);//得到绑定sql  
CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);//创建缓存key
return query(ms, parameter, rowBounds, resultHandler, key, boundSql);//查询  
} @SuppressWarnings("unchecked")
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) throw new ExecutorException("Executor was closed.");
//先清局部缓存,再查询,但仅仅查询堆栈为0才清,为了处理递归调用 
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;//加一,这样递归调用到上面的时候就不会再清局部缓存了 
//根据cachekey从localCache去查 
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
//如果查到localCache缓存,处理localOutputParameterCache  
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
//从数据库查
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
//延迟加载队列中所有元素 
deferredLoad.load();
}
//清空延迟加载队列
deferredLoads.clear(); // issue #601
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
//如果是statement,清本地缓存  
clearLocalCache(); // issue #482
}
}
return list;
}

//延迟加载 
public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
if (closed) throw new ExecutorException("Executor was closed.");
DeferredLoad deferredLoad = new DeferredLoad(resultObject, property, key, localCache, configuration, targetType);
if (deferredLoad.canLoad()) {//如果能加载则立即加载,否则加入到延迟加载队列中
deferredLoad.load();
} else {
deferredLoads.add(new DeferredLoad(resultObject, property, key, localCache, configuration, targetType));
}
}

//创建缓存key 
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) throw new ExecutorException("Executor was closed.");
CacheKey cacheKey = new CacheKey();
//MyBatis 对于其 Key 的生成采取规则为:[mappedStementId + offset + limit + SQL + queryParams + environment]生成一个哈希码
cacheKey.update(ms.getId());
cacheKey.update(rowBounds.getOffset());
cacheKey.update(rowBounds.getLimit());
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
for (int i = 0; i < parameterMappings.size(); i++) { // mimic DefaultParameterHandler logic
ParameterMapping parameterMapping = parameterMappings.get(i);
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
cacheKey.update(value);
}
}
return cacheKey;
} public boolean isCached(MappedStatement ms, CacheKey key) {
return localCache.getObject(key) != null;
} public void commit(boolean required) throws SQLException {
if (closed) throw new ExecutorException("Cannot commit, transaction is already closed");
clearLocalCache();
flushStatements();
if (required) {
transaction.commit();
}
} public void rollback(boolean required) throws SQLException {
if (!closed) {
try {
clearLocalCache();
flushStatements(true);
} finally {
if (required) {
transaction.rollback();
}
}
}
}

//清空本地缓存,一个map结构  
public void clearLocalCache() {
if (!closed) {
localCache.clear();
localOutputParameterCache.clear();
}
} protected abstract int doUpdate(MappedStatement ms, Object parameter)
throws SQLException; protected abstract List<BatchResult> doFlushStatements(boolean isRollback)
throws SQLException; protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
throws SQLException; protected void closeStatement(Statement statement) {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
// ignore
}
}
}

//处理存储过程的out参数
private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
final Object cachedParameter = localOutputParameterCache.getObject(key);
if (cachedParameter != null && parameter != null) {
final MetaObject metaCachedParameter = configuration.newMetaObject(cachedParameter);
final MetaObject metaParameter = configuration.newMetaObject(parameter);
for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
if (parameterMapping.getMode() != ParameterMode.IN) {
final String parameterName = parameterMapping.getProperty();
final Object cachedValue = metaCachedParameter.getValue(parameterName);
metaParameter.setValue(parameterName, cachedValue);
}
}
}
}
}

//从数据库中查 
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);//向缓存中放入占位符  
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);//清除占位符
}
localCache.putObject(key, list);//加入缓存
if (ms.getStatementType() == StatementType.CALLABLE) {//如果是存储过程,OUT参数也加入缓存
localOutputParameterCache.putObject(key, parameter);
}
return list;
} protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
} public void setExecutorWrapper(Executor wrapper) {
this.wrapper = wrapper;
}

//延迟加载  
private static class DeferredLoad { private final MetaObject resultObject;
private final String property;
private final Class<?> targetType;
private final CacheKey key;
private final PerpetualCache localCache;
private final ObjectFactory objectFactory;
private final ResultExtractor resultExtractor; public DeferredLoad(MetaObject resultObject,
String property,
CacheKey key,
PerpetualCache localCache,
Configuration configuration,
Class<?> targetType) { // issue #781
this.resultObject = resultObject;
this.property = property;
this.key = key;
this.localCache = localCache;
this.objectFactory = configuration.getObjectFactory();
this.resultExtractor = new ResultExtractor(configuration, objectFactory);
this.targetType = targetType;
}

//缓存中找到,且不为占位符,代表可以加载 
public boolean canLoad() {
return localCache.getObject(key) != null && localCache.getObject(key) != EXECUTION_PLACEHOLDER;
}

//加载
public void load() {
@SuppressWarnings( "unchecked" ) // we suppose we get back a List
List<Object> list = (List<Object>) localCache.getObject(key);
Object value = resultExtractor.extractObjectFromList(list, targetType);
resultObject.setValue(property, value);
} } }