conductor 事件处理程序

时间:2022-02-20 14:04:05

Introduction

conductor中的事件提供工作流之间的松散耦合,并支持从外部系统生成和消耗事件。

包括:

1. 能够在外部系统像SQS或Conductor内部生成一个事件(消息)。

2. 当发生与提供的条件匹配的特定事件时启动工作流。

conductor提供了SUB_WORKFLOW任务,可用于将工作流嵌入到父工作流程中。 事件支持提供类似的功能,而无需显式添加依赖关系,并提供了一个无休止的风格集成。

Event Task

事件任务提供将事件(消息)发布到conductor或外部事件系统(如SQS)的能力。 事件任务对于为工作流和任务创建基于事件的依赖非常有用。

Event Handler

事件处理程序是已注册的监听器,当发生匹配事件时执行一个操作。 支持的操作是:

  1. Start a Workflow
  2. Fail a Task
  3. Complete a Task

事件处理程序可以配置为监听Conductor Events或外部事件(如SQS)。

Configuration

事件处理程序通过/ event / API配置。

结构:

{
"name" : "descriptive unique name",
"event": "event_type:event_location",
"condition": "boolean condition",
"actions": ["see examples below"]
}
条件:
条件是必须评估为一个布尔值的表达式。 支持类似JavaScript的语法,可用于基于有效载荷评估条件。 只有当条件求值为true时才执行操作。 给定消息中的以下有效内容:
{
"fileType": "AUDIO",
"version": 3,
"metadata": {
length: 300,
codec: "aac"
}
}
表达式:
$.version > 1    true
$.version > 10   false
$.metadata.length == 300  true 操作

Start A Workflow

{
"action": "start_workflow",
"start_workflow": {
"name": "WORKFLOW_NAME",
"version": <optional>
"input": {
"param1": "${param1}"
}
}
}

Complete Task*

{
"action": "complete_task",
"complete_task": {
"workflowId": "${source.externalId.workflowId}",
"taskRefName": "task_1",
"output": {
"response": "${source.result}"
}
},
"expandInlineJSON": true
}

Fail Task*

{
"action": "fail_task",
"fail_task": {
"workflowId": "${source.externalId.workflowId}",
"taskRefName": "task_1",
"output": {
"response": "${source.result}"
}
},
"expandInlineJSON": true
}

启动工作流的输入和完成/失败任务的输出 遵循用于引导工作流输入的相同表达式。