使用c#的 async/await编写 长时间运行的基于代码的工作流的 持久任务框架

时间:2022-11-25 15:08:11

持久任务框架 (DTF) 是基于async/await 工作流执行框架。工作流的解决方案很多,包括Windows Workflow Foundation,BizTalk,Logic Apps, Workflow-CoreElsa-Core。最近我在Dapr 的仓库里跟踪工作流构建块的进展时,深入了解了一下,这个DTFx在Azure 基础设施有大量的应用,现在Dapr团队正在把这个实践抽象成工作流构建块,具体参看https://github.com/dapr/dapr/issues/4576。DTFx 正好是.NET开发的,所以对他多了几分关注,以前没有深入进去看看,现在我觉得是值得推荐给大家的一个工作流方案,它足够轻量级,而且非常简单,依赖很少。

持久任务框架是一个开源框架,它为 .NET 平台中的工作流即代码提供了基础。GitHub上:https://github.com/Azure/durabletask

它有两个主要组件:业务流程和任务。业务流程“编排”应用程序逻辑,以内联方式执行自定义代码并调用任务。自定义业务流程派生自 TaskOrchestration<TResult, TInput>自定义任务派生自 TaskActivity<TInput, TResult>。

推荐大家从这两个仓库可用来学习和生产使用。

Microsoft.Extensions.Hosting包装器: https://github.com/jviau/durabletask-hosting

持久任务框架扩展: https://github.com/lucaslorentz/durabletask-extensions


我们一起来看下持久任务框架的Hello world: 代码来自https://github.com/jviau/durabletask-hosting 的 DurableTask.Samples:

这个非常简单的业务流程“GreetingsOrchestration”,有两个称为任务“GetUserTask”,它执行名称提示和“SendGreetingTask”,它将问候语写入控制台。

GreetingsOrchestration 派生自 TaskOrchestration<string、string> 并具有调用 GetUserTask 和 SendGreetingTask 的 RunTask 方法。

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

/// <summary>
/// A task orchestration for greeting a user.
/// </summary>
public class GreetingsOrchestration : TaskOrchestration<string, string>
{
     /// <inheritdoc />
     public override async Task<string> RunTask(OrchestrationContext context, string input)
     {
         string user = await context.ScheduleTask<string>(typeof(GetUserTask));
         string greeting = await context.ScheduleTask<string>(typeof(SendGreetingTask), user);
         return greeting;
     }
}

GetUserTask 派生自 TaskActivity<string,string> 并实现了 Execute 方法

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

/// <summary>
/// A task activity for getting a username from console.
/// </summary>
public class GetUserTask : TaskActivity<string, string>
{
     private readonly IConsole _console;

    /// <summary>
     /// Initializes a new instance of the <see cref="GetUserTask"/> class.
     /// </summary>
     /// <param name="console">The console output helper.</param>
     public GetUserTask(IConsole console)
     {
         _console = console ?? throw new ArgumentNullException(nameof(console));
     }

    /// <inheritdoc />
     protected override string Execute(TaskContext context, string input)
     {
         _console.WriteLine("Please enter your name:");
         return _console.ReadLine();
     }
}

SendGreetingTask 派生自 TaskActivity<string、string> 并实现了 Excute 方法

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

/// <summary>
/// A task for sending a greeting.
/// </summary>
public sealed class SendGreetingTask : AsyncTaskActivity<string, string>
{
     private readonly IConsole _console;

    /// <summary>
     /// Initializes a new instance of the <see cref="SendGreetingTask"/> class.
     /// </summary>
     /// <param name="console">The console output helper.</param>
     public SendGreetingTask(IConsole console)
     {
         _console = console ?? throw new ArgumentNullException(nameof(console));
     }

    /// <inheritdoc />
     protected override async Task<string> ExecuteAsync(TaskContext context, string user)
     {
         string message;
         if (!string.IsNullOrWhiteSpace(user) && user.Equals("TimedOut"))
         {
             message = "GetUser Timed out!!!";
             _console.WriteLine(message);
         }
         else
         {
             _console.WriteLine("Sending greetings to user: " + user + "...");
             await Task.Delay(5 * 1000);
             message = "Greeting sent to " + user;
             _console.WriteLine(message);
         }

        return message;
     }
}

上面的这个例子非常基础,我们在项目中要把它用起来就要用到这个扩展项目 https://github.com/lucaslorentz/durabletask-extensions。这个项目通过更多功能扩展持久任务框架,并使其更易于使用,目前还在开发过程中,尚未达到投入生产的程度。包含了下列这些功能,让你在任何地方都可以运行。

  • 更多定义存储功能的接口
  • 依赖注入集成
  • EF Core MySql/PostgreSQL/SqlServer storages
  • 分布式工作线程:允许在多个工作线程中拆分业务流程/活动实现
  • 通过 GRPC 协议进行间接存储访问:将您的存储选择和配置集中在单个组件中。
  • 用户界面
  • BPMN 运行器

示例文件夹中,您可以找到经典书籍《飞行、汽车、酒店》的实现,其中包含补偿问题。

该示例旨在演示具有以下组件的微服务体系结构:

  • 服务器:连接到存储并将其公开为 GRPC 终结点。
  • 应用程序接口:公开 REST API 以管理业务流程。
  • 用户界面:公开用于管理业务流程的 UI。
  • 业务流程工作线程:为给定问题实现BookParallelBookSquential业务流程。
  • 飞行工作人员:实施预订航班和取消航班活动。
  • 车夫:实施“预订汽车”和“取消汽车”活动。
  • 酒店工作人员:实施预订酒店和取消酒店活动。
  • BPMNWorker:一个建立在持久任务之上的实验性 BPMN 运行器。对于给定的问题,还有BookParallelBookSequentialBPMN 工作流。

使用c#的 async/await编写 长时间运行的基于代码的工作流的 持久任务框架