CQRS学习——Storage实现(EF+Code First+DynamicReponsitory)[其四]

时间:2021-11-08 19:06:42

【这里是的实现,指的是针对各个数据访问框架的一个基础实现】

目标

  •   定义仓储/QueryEntry的基本功能
  •   实现仓储的基本功能,以利于复用
  •   实现一些常用的功能
  •   提供一些便利的功能

目标框架

博主使用的ORM框架是EF6.x,使用MAP来配置模型和数据库之间的映射(因为模型是定义在领域层[CQRS]的),所以不打算使用声明式的Attribute。使用code first来生成数据库。

仓储基本功能

使用一个泛型接口定义了一个仓储需要实现的功能:

public interface IBasicReponsitory<T>
{
void Insert(T item);
void Delete(T item);
void Delete(Guid aggregateId);
void Update(T category);
T Fetch(Guid aggregateId);
T TryFetch(Guid aggregateId);
bool Exists(Expression<Func<T, bool>> predict); /*以下是额外的一些接口方法,待商榷*/
IQueryable<T> Query();
Task<T> FetchAsync(Guid id);
Task<T> TryFetchAsync(Guid id);
Task<IEnumerable<T>> RetriveAsync(Expression<Func<T, bool>> predict);
}

以及一个QueryEntry需要实现的一些基本功能:

public interface IQueryEntry<T> where T : IHasPrimaryKey
{
T TryFetch(Guid id);
Task<T> TryFetchAsync(Guid id); bool Exsits(Guid id);
bool Exsits(Expression<Func<T, bool>> selector);
}

随着时间的推移,这个接口会发生变更,添加一些更多的功能。同时,并不要求所有的仓储或者QueryEntry继承接口,基本接口的定义和实现仅仅是为了提供便利。

为了方便QueryEntry的实现,提供了一个抽象类:

public abstract class ReponsitoryBasedQueryEntry<T> : IQueryEntry<T> where T : IHasPrimaryKey
{
public abstract IBasicReponsitory<T> BasicReponsitory { get; } public T TryFetch(Guid id)
{
return BasicReponsitory.TryFetch(id);
} public Task<T> TryFetchAsync(Guid id)
{
return BasicReponsitory.TryFetchAsync(id);
} public bool Exsits(Guid id)
{
return BasicReponsitory.Query().Any(i => i.Id == id);
} public bool Exsits(System.Linq.Expressions.Expression<Func<T, bool>> selector)
{
return BasicReponsitory.Query().Any(selector);
}
}

基本实现

public class BasicEntityFrameworkReponsitory<T> : IBasicReponsitory<T> where T : class, IHasPrimaryKey
{
public BasicEntityFrameworkReponsitory()
{
Table = StorageConfiguration.DbContext.Set<T>();
} public DbSet<T> Table { get; private set; } public virtual void Insert(T item)
{
Table.Add(item);
} public virtual void Delete(T item)
{
Table.Remove(item);
} public virtual void Delete(Guid aggregateId)
{
var item = TryFetch(aggregateId);
Delete(item);
} public virtual void Update(T category)
{
//do nothing...
} public T Fetch(Guid aggregateId)
{
var item = TryFetch(aggregateId);
if (item == null)
{
throw new AggregateRootNotFoundException(aggregateId);
}
return item;
} public T TryFetch(Guid aggregateId)
{
var item = Query().FirstOrDefault(i => i.Id == aggregateId);
return item;
} public virtual IQueryable<T> Query()
{
return Table;
} public async Task<T> FetchAsync(Guid id)
{
return await Table.FirstAsync(i => i.Id == id);
} public async Task<T> TryFetchAsync(Guid id)
{
return await Table.FirstOrDefaultAsync(i => i.Id == id);
} public bool Exists(Expression<Func<T, bool>> predict)
{
return Table.Any(predict);
} public async Task<IEnumerable<T>> RetriveAsync(Expression<Func<T, bool>> predict)
{
return await Table.Where(predict).ToArrayAsync();
}
}

这部分代码表达了个人的几个想法:
1.DbContext的生命周期是由Storage自行管理的。当然,可以通过一定的方式指定。

2.提供了基础的Query()方法,并设置为虚方法。个人并不抵制使用IQueryable对象进行查询。我觉得可以把使用IQueryable对象进行查询的代码片段看作匿名方法。

常用的功能:软删除

这里是继承基本实现的一个实现:

public class SoftDeleteEntityFrameworkReponsitory<T> : BasicEntityFrameworkReponsitory<T>
where T : class, IHasPrimaryKey, ISoftDelete
{
public override IQueryable<T> Query()
{
return base.Query().Where(i => !i.IsDeleted);
} public override void Delete(T item)
{
item.IsDeleted = true;
Update(item);
}
}

这里要求仓储对应的模型实现接口ISoftDelete,为软删除提供支持:

public interface ISoftDelete
{
bool IsDeleted { get; set; }
}

同时override了Query()方法,过滤了已删除的内容。

常用的功能:操作跟踪

好吧,这应该是事件溯源干的事,然而事件溯源目前太难了。原理和软删除差不多:

/// <summary>
/// 既然开启了跟踪,那么这条数据必然是不能硬删除的
/// </summary>
/// <typeparam name="T"></typeparam>
public class TraceEnabledEntityFrameworkReponsitory<T> : SoftDeleteEntityFrameworkReponsitory<T>
where T : class, ISoftDelete, ITrackLastModifying, IHasPrimaryKey
{
/// <summary>
/// 开启跟踪时,不允许匿名操作
/// </summary>
[Dependency]
public IDpfbSession Session { get; set; } public override void Update(T item)
{
if (!Session.UserId.HasValue)
throw new Exception(); //todo 提供一个明确的异常
item.LastModifiedBy = Session.UserId.Value;
item.LastModifiedTime = DateTime.Now;
} public override void Insert(T item)
{
if (!Session.UserId.HasValue)
throw new Exception(); //todo 提供一个明确的异常
item.LastModifiedBy = Session.UserId.Value;
item.LastModifiedTime = DateTime.Now;
base.Insert(item);
}
}

不过这个功能的侵入性很强,Storage应该无法感知“用户”这种概念才对。

便利的功能:动态仓储(DynamicReponsitory)

前一篇文章中说过,引入QueryEntry是为了将查询和提交分来,同时为查询操作提供更大的优化空间。在面对数据库的查询中,多表联查是非常普遍的。所以打算针对多表联查提供一个遍历的组件。同时,直接提交语句查询是和数据库相关的,所以要针对不同的数据库提供不同的DynamicReponsitory。

这个组件解决的问题是:直接提交数据库多表联查,查询结果自动转换模型,提供分页支持。

模型转换

先来解决这个比较有趣的问题:将一个DataReader转换为一个值或者一个可枚举的集合。直接上实现代码:

public class DataReaderTransfer<T> : CacheBlock<string, Func<IDataReader, T>> where T : new()
{
protected DataReaderTransfer()
{
} /// <summary>
///
/// </summary>
/// <param name="filedsNameArray"></param>
/// <param name="key">编译缓存所使用的key,建议使用查询字符串的hash</param>
/// <returns></returns>
public Func<IDataReader, T> Compile(string[] filedsNameArray, string key)
{
var outType = typeof (T);
var func = ConcurrentDic.GetOrAdd(key, k =>
{
var expressions = new List<Expression>();
//public T xxx(IDataReader reader){
var param = Expression.Parameter(typeof (IDataReader)); //var instance = new T();
var newExp = Expression.New(outType);
var varExp = Expression.Variable(outType, "instance");
var varAssExp = Expression.Assign(varExp, newExp);
expressions.Add(varAssExp); var indexProp = typeof (IDataRecord).GetProperties().Last(p => p.Name == "Item"); //表示 reader[""]
foreach (var fieldName in filedsNameArray)
{
//if(xxx)xxx.xxx=null;else xxx.xxx = (xxx)value; var prop = outType.GetProperty(fieldName);
if (prop == null)
continue;
var propExp = Expression.PropertyOrField(varExp, fieldName);
Expression value = Expression.MakeIndex(param, indexProp,
new Expression[] {Expression.Constant(fieldName)}); //处理空值
var defaultExp = Expression.Default(prop.PropertyType);
var isDbNullExp = Expression.TypeIs(value, typeof (DBNull)); //处理枚举以及可空枚举
if (prop.PropertyType.IsEnum ||
prop.PropertyType.IsGenericType && prop.PropertyType.GetGenericArguments()[].IsEnum)
{
value = Expression.Convert(value, typeof (int));
}
var convertedExp = Expression.Convert(value, prop.PropertyType);
//读取到dbnull的时候,使用一个默认值
var condExp = Expression.IfThenElse(isDbNullExp,
Expression.Assign(propExp, defaultExp),
Expression.Assign(propExp, convertedExp)); expressions.Add(condExp);
} //return instance;
var retarget = Expression.Label(outType);
var returnExp = Expression.Return(retarget, varExp);
expressions.Add(returnExp); //}
var relabel = Expression.Label(retarget, Expression.Default(outType));
expressions.Add(relabel); var blockExp = Expression.Block(new[] {varExp}, expressions);
var expression = Expression.Lambda<Func<IDataReader, T>>(blockExp, param);
return expression.Compile();
});
return func;
} public Func<IDataReader, T> Compile(IDataReader reader, string key)
{
var length = reader.FieldCount;
var names = Enumerable.Range(, length).Select(i => reader.GetName(i - )).ToArray();
return Compile(names, key);
} public static DataReaderTransfer<T> Instance = new DataReaderTransfer<T>(); //基于反射的映射....
//private static T DynamicMap<T>(IDataReader reader) where T : new()
//{
// var instance = new T();
// var count = reader.FieldCount;
// while (count-- > 1)
// {
// object value = reader[count - 1];
// var name = reader.GetName(count - 1);
// var prop = typeof (T).GetProperty(name);
// if (prop == null)
// {
// continue;
// }
// if (value is DBNull)
// {
// value = null;
// }
// prop.SetValue(instance, value);
// }
// return instance;
//}
}

主要的思想是:在运行期间对一个特定的模型分析一次,分析构造这个模型需要如何访问DataReader,并将访问操作编译为Func<>,通过一个静态字典缓存。下一次构造的时候,直接访问静态字典的Func<>,将DataReader的行转换为模型。这个耗时,大概是硬编码转换的2倍,可以获得比反射好的性能受益。

链式调用以及延时查询

先来看一段调用代码:

[TestClass]
public class DynamicReponsitorySamples
{
static DynamicReponsitorySamples()
{
//DbContext 配置
StorageConfiguration.Config.Use<DbContext, ObjectManageContext>(new ContainerControlledLifetimeManager());
//无法使用.UseDbContext<ObjectManageContext>(),因为无法提供基于HTTP生命周期的管理对象
DynamicReponsitory = new DynamicReponsitory();
} public static DynamicReponsitory DynamicReponsitory { get; set; } [TestMethod]
public void Query()
{
//直接提交一个SQL查询,并映射到实体
var queryText =
"SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code";
var query = DynamicReponsitory.Query<AdminListItem>(queryText);
//QueryResult对象遵循延时查询的规则,直到执行枚举才会执行查询操作
query.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName)));
} [TestMethod]
public void Count()
{
//可以直接执行一个COUNT(*)语句
var countQueryText = "SELECT COUNT(*) FROM [ADMIN]";
var countQuery = DynamicReponsitory.Count(countQueryText);
Trace.WriteLine("Count:" + countQuery.Value);
//可以提供一个SELECT * 语句
countQueryText = "SELECT * FROM [ADMIN]";
//但是需要将重载的第二个参数置为true
countQuery = DynamicReponsitory.Count(countQueryText, true);
Trace.WriteLine("Count:" + countQuery.Value);
//可以对一个query对象执行CmountAmount()扩展方法,但是这个query对象代表的查询必须很普通
var query = DynamicReponsitory.Query<AdminListItem>(
"SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code");
countQuery = query.CountAmount();
//Value的值同样遵循延时查询的规则,但是重复访问会导致访问内存中缓存的数据
Trace.WriteLine("Count:" + countQuery.Value);
Trace.WriteLine("Count:" + countQuery.Value);
//如果需要重新查询,可以调用Result.ReQuery()方法
var reQuery = countQuery.ReQuery();
Trace.WriteLine("Count:" + reQuery.Value);
} /// <summary>
/// 分页调用,支持分页信息和分页列表信息的无序访问
/// </summary>
[TestMethod]
public void Page()
{
//可以对所有的query对象执行Page()扩展方法,从而进行分页
//必须执行要求OrderBy参数的重载,否则会进行内存分页(加载所有行)
var query = DynamicReponsitory.Query<AdminListItem>(
"SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code");
var paged = query.Page("ORDER BY DepartmentName", , ); /*
* 以下表示支持分页信息和分页列表信息的无序访问
* 如果使用一条sql同时返回这些信息,必须先枚举集合才能继续访问分页信息
*/ Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount));
paged.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName)));
//重复访问会导致访问内存中缓存的数据
var resultArray = paged.Take();
Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount));
resultArray.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName)));
}
}

链式调用是指,我调用了DynamicReponsitory.Query()方法之后,可以紧接着调用Page()或者Count()方法。那么,显而易见,如果查询不是延时的,很容易导致这个问题:我把服务器上1W条数据全down下来了,然后在内存里面数数或者分页。
为了实现延时查询的目标,引入了这几个类型:

public class SqlQueryExpression : ICloneable
{
public SqlQueryExpression()
{
Parameters = new List<object>();
} public SqlQueryExpression(string expressionText) : this()
{
ExpressionText = expressionText;
} public string ExpressionText { get; set; }
public IList<object> Parameters { get; private set; } public IDataReader Read(DbConnection connection)
{
var parameters = Parameters.ToArray();
if (connection.State != ConnectionState.Open)
connection.Open();
//查询,开启最低级别的事务隔离,防止默认事务产生争用锁
var trans = connection.BeginTransaction(IsolationLevel.ReadUncommitted);
var command = connection.CreateCommand();
command.CommandType = CommandType.Text;
command.CommandText = ExpressionText;
command.Parameters.AddRange(parameters);
command.Transaction = trans;
return command.ExecuteReader(CommandBehavior.CloseConnection);
} public virtual object Clone()
{
//实现拷贝接口
var cloned = new SqlQueryExpression(ExpressionText);
Parameters.Foreach(i =>
{
var parameter = (SqlParameter) i;
var clonedParameter = new SqlParameter(parameter.ParameterName, parameter.Value);
clonedParameter.Direction = parameter.Direction;
cloned.Parameters.Add(clonedParameter);
});
return cloned;
}
}
 public class SqlQueryResult
{
public SqlQueryExpression SqlQueryExpression { get; private set; }
public DbConnection DbConnection { get; private set; }
public virtual bool Enumerated { get; protected set; }
protected IDataReader DataReader; protected void Query()
{
DataReader = DataReader ?? SqlQueryExpression.Read(DbConnection);
} public SqlQueryResult(SqlQueryExpression expression, DbConnection connection)
{
SqlQueryExpression = expression;
DbConnection = connection;
}
} /// <summary>
/// 代表DynamicReponsitory的查询结果
/// </summary>
/// <typeparam name="T">代表需要构造的类型</typeparam>
public class SqlQueryResult<T> : SqlQueryResult, IEnumerable<T> where T : new()
{
public SqlQueryResult(SqlQueryExpression expression, DbConnection connection)
: base(expression, connection)
{ } public IEnumerator<T> GetEnumerator()
{
//对于一个Query对象,在第一次访问的时候,要求加载所有数据,防止Skip与Take导致数据丢失
if (!Enumerated)
{
Query();
using (DataReader)
{
Enumerated = true;
var uniqueKey = typeof (T).FullName + SqlQueryExpression.ExpressionText;
var func = DataReaderTransfer<T>.Instance.Compile(DataReader, uniqueKey);
while (DataReader.Read())
{
var item = func(DataReader);
ResultSet.Add(item);
//yield return item;
}
}
}
return ResultSet.GetEnumerator();
//return ((IEnumerable<T>) ResultSet).GetEnumerator();
} protected List<T> ResultSet = new List<T>(); IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
} public SqlQueryResult<T> ReQuery()
{
var exp = SqlQueryExpression.Clone() as SqlQueryExpression;
return new SqlQueryResult<T>(exp, DbConnection);
}
}

SqlQueryExpression存储了将要执行的查询,而SqlQueryResult则存储了查询返回的结果。同时,SqlQueryExpression实现了拷贝,以支持ReQuery()。
关于具体的分页支持,实际上是使用了一个开窗函数,通过注入子查询的方式,从而支持了各种查询的分页(不奇葩的查询)。

为了防止查询被锁住,默认开启了最低的事务隔离级别。

...

【想到什么再补充】