NServiceBus+Saga开发分布式应用

时间:2022-06-06 13:52:15

前言

当你在处理异步消息时,每个单独的消息处理程序都是一个单独的handler,每个handler之间互不影响。这时如果一个消息依赖另一个消息的状态呢? 这时业务逻辑怎么处理?

NServiceBus+Saga开发分布式应用

借用我们上篇文章的业务场景,如果在Ship项目里需要发送一个ShipOrder Command。这个ShipOrder需要依赖Sales.OrderPlaced和Bill.OrderBilled Command的状态,目前我们的两个单独的Message Handler都没有保持任何的状态字段,所以这时如果我们需要完成这个业务模型,就需要跟踪他们的状态。

什么是Saga

这个就是本篇文章要提的saga,定义在NServiceBus框架里,他的本质是一个消息驱动模型里的状态机,或者也可以理解为一系列消息处理程序用来共享状态的业务模型。我理解在消息队列里如果我们要保证消息一致性通常会自己创建一张Event表,这里saga维持状态的角色有点像我们这里的Event表。

   

NServiceBus+Saga开发分布式应用

   好的,回到正题上,如果我们需要在Shipping Service里发送一个ShipOrder,发送他之前需要确定OrderPlaced和OrderBilled的状态,确保这两个消息都收到以后才能发送ShipOrder。

如何使用Saga

当然,我暂且理解Saga的目的是为了处理在长时间运行的任务里保证数据一致性这样的一个角色。

Saga状态

saga状态主要是告诉NServiceBus在处理数据一致性的判断逻辑,这里需要继承抽象类ContainSagaData,在我们这个业务场景中则主要是判断OrderPlaced和OrderBilled消息是否已经接收到并处理。

public class ShippingPolicyData:ContainSagaData
{
public string OrderId { get; set; }
public bool IsOrderPlaced { get; set; }
public bool IsOrderBilled { get; set; }
}

Saga如何工作

有了状态以后,我们还需要一个“handler”来告诉NServiceBus,在这个handler里主要用来处理消息数据一致性,我看了官方文档后,他们建议我们这里的handler角色使用Policy后缀命名,当然我觉的也可以用Saga后缀命名,比如ShippingPolicy或者ShippingSaga。

   同时这里我们这个handler觉色还要继承Saga类,Saga类主要重写方法ConfigureHowToFindSaga,这个方法的作用主要是在接受的消息和我们的Saga实体之间建立映射关系。

 public class ShipPolicy:Saga<ShippingPolicyData>,
IAmStartedByMessages<OrderPlaced>,
IAmStartedByMessages<OrderBilled> //都可以创建Saga实例
{
private static ILog log = LogManager.GetLogger<ShipPolicy>(); protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShippingPolicyData> mapper)
{
mapper.ConfigureMapping<OrderPlaced>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);
mapper.ConfigureMapping<OrderBilled>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId); } public Task Handle(OrderPlaced message, IMessageHandlerContext context)
{
log.Info("OrderPlaced message received ");
this.Data.IsOrderPlaced = true;
return ProcessOrder(context);
} public Task Handle(OrderBilled message, IMessageHandlerContext context)
{ log.Info("OrderBilled message received");
this.Data.IsOrderBilled = true;
return ProcessOrder(context);
} private async Task ProcessOrder(IMessageHandlerContext context)
{
if (Data.IsOrderBilled && Data.IsOrderPlaced)
{
await context.SendLocal(new ShipOrder()
{
OrderId = Data.OrderId
}); MarkAsComplete();
}
}
}

这个类里你会发现还实现了接口**IAmStartedByMessages, **这个接口主要是告诉Saga,不论是那种消息类型先进来,都可以创建一个Saga实例,就比如是Event表,不管那个消息进来,都需要先插入一条数据,后续消息再进来时要更新数据状态,当然,这里的Saga实例也好,Event表也好,关键问题就是有效标识,或者叫主键,我们这个业务模型里,OrderPlaced和OrderBilled都包含一个属性OrderId, 这里Saga实例则使用这个OrderId做关键属性。

发送ShipOrder Command

到这里也就是我们的OrderPlaced和OrderBIlled消息都收到了,业务逻辑符合要求,可以发送ShipOrder消息了,也就是用户创建了订单,付了款,可以发货了。

NServiceBus+Saga开发分布式应用

新建ShipOrder类

public class ShipOrder:ICommand
{
public string OrderId { get; set; }
}

新建ShipOrderHandler

public class ShipOrderHandler:IHandleMessages<ShipOrder>
{
private static ILog log = LogManager.GetLogger<ShipOrderHandler>();
public Task Handle(ShipOrder message, IMessageHandlerContext context)
{
log.Info($"Order [{message.OrderId}] - Successfully shipped");
return Task.CompletedTask;
}
}

运行Shipping项目,看到下图,则说明程序运行成功,我们这个业务场景里OrderPlaced消息肯定先接受到,OrderBilled消息后接受到。

NServiceBus+Saga开发分布式应用

参考链接

https://docs.particular.net/tutorials/nservicebus-sagas/1-getting-started/

https://docs.particular.net/nservicebus/sagas/