Apache Beam / Google数据流 - 错误处理

时间:2021-11-23 14:40:26

I have a pipeline with quite a few steps (just above 15). I want to report failures everytime a DoFn fails. I started implementing it through TupleTags with code such as :

我有一个有很多步骤的管道(刚刚超过15)。我想在每次DoFn失败时报告失败。我开始通过TupleTags实现它,代码如下:

try {
 ... do stuff ...
 c.output(successTag, ...);
} catch (Exception e) {
 c.output(failureTag, new Failure(...));
}

But since my pipeline contains a lot of steps, this make the pipeline definition code quite hard to read / maintain.

但由于我的管道包含很多步骤,这使得管道定义代码很难读取/维护。

Is there a more global way to achieve it ? Something like raising a custom exception which is handled globally at the pipeline level ?

是否有更全面的方式来实现它?提出在管道级别全局处理的自定义异常之类的东西?

1 个解决方案

#1


2  

What you are doing is the correct approach to catch errors and output them differently. You will need this on each step though. You could use a java pattern to reuse it if you prefer. Create a base class for all your ParDos and in processElement add the exception handling code. Then implement your processElement in a separate function (i.e. processElementImpl) which you call in processElement.

您正在做的是捕获错误并以不同方式输出错误的正确方法。但是,您需要在每个步骤中使用此功能。如果您愿意,可以使用java模式重用它。为所有ParDos创建基类,并在processElement中添加异常处理代码。然后在processElement中调用的单独函数(即processElementImpl)中实现processElement。

#1


2  

What you are doing is the correct approach to catch errors and output them differently. You will need this on each step though. You could use a java pattern to reuse it if you prefer. Create a base class for all your ParDos and in processElement add the exception handling code. Then implement your processElement in a separate function (i.e. processElementImpl) which you call in processElement.

您正在做的是捕获错误并以不同方式输出错误的正确方法。但是,您需要在每个步骤中使用此功能。如果您愿意,可以使用java模式重用它。为所有ParDos创建基类,并在processElement中添加异常处理代码。然后在processElement中调用的单独函数(即processElementImpl)中实现processElement。