GCP Dataflow Apache Beam写入输出错误处理

时间:2022-11-04 15:34:39

I need to apply error handling to my Dataflow for multiple inserts to Spanner with the same primary key. The logic being that an older message may be received after the current message and I do not want to overwrite the saved values. Therefore I will create my mutation as an insert and throw an error when a duplicate insert is attempted.

我需要对我的Dataflow应用错误处理,以便使用相同的主键对Spanner进行多次插入。逻辑是在当前消息之后可能接收到较旧的消息,并且我不想覆盖保存的值。因此,我将创建我的变异作为插入,并在尝试重复插入时抛出错误。

I have seen several examples of try blocks within DoFn's that write to a side output to log any errors. This is a very nice solution but I need to apply error handling to the step that writes to Spanner which does not contain a DoFn

我在DoFn中看到了几个try块的例子,它们写入一个侧输出来记录任何错误。这是一个非常好的解决方案,但我需要将错误处理应用于写入不包含DoFn的Spanner的步骤

spannerBranchTuples2.get(spannerOutput2)
    .apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))                      
    .apply("Write Spanner Records", SpannerIO.write()
        .withInstanceId(options.getSpannerInstanceId())                  
        .withDatabaseId(options.getSpannerDatabaseId())
        .grouped());

I have not found any documentation that allows error handling to be applied to this step, or found a way to re-write it as a DoFn. Any suggestions how to apply error handling to this? thanks

我没有找到任何允许将错误处理应用于此步骤的文档,或者找到了将其重写为DoFn的方法。有关如何对此应用错误处理的任何建议吗?谢谢

1 个解决方案

#1


0  

There is an interesting pattern for this in Dataflow documentation.

Dataflow文档中有一个有趣的模式。

Basically, the idea is to have a DoFn before sending your results to your writing transforms. It'd look something like so:

基本上,我们的想法是在将结果发送到您的书写转换之前使用DoFn。它看起来像这样:

final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
  @Override
  void processElement(ProcessContext c) {
  try {
    c.output(process(c.element());
  } catch (Exception e) {
    LOG.severe("Failed to process input {} -- adding to dead letter file",
      c.element(), e);
    c.sideOutput(deadLetterTag, c.element());
  }
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

outputTuple.get(deadLetterTag)
  .apply(/* Write to a file or table or anything */);

outputTuple.get(successTag)
  .apply(/* Write to Spanner or any other sink */);

Let me know if this is useful!

如果这有用,请告诉我!

#1


0  

There is an interesting pattern for this in Dataflow documentation.

Dataflow文档中有一个有趣的模式。

Basically, the idea is to have a DoFn before sending your results to your writing transforms. It'd look something like so:

基本上,我们的想法是在将结果发送到您的书写转换之前使用DoFn。它看起来像这样:

final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
  @Override
  void processElement(ProcessContext c) {
  try {
    c.output(process(c.element());
  } catch (Exception e) {
    LOG.severe("Failed to process input {} -- adding to dead letter file",
      c.element(), e);
    c.sideOutput(deadLetterTag, c.element());
  }
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

outputTuple.get(deadLetterTag)
  .apply(/* Write to a file or table or anything */);

outputTuple.get(successTag)
  .apply(/* Write to Spanner or any other sink */);

Let me know if this is useful!

如果这有用,请告诉我!