意外的行为从数据流1.9更改为2.0 / 2.1

时间:2022-11-01 15:35:47

We found a very strange difference between Dataflow SDK 1.9 and 2.0/2.1 for a very simple pipeline.

对于非常简单的管道,我们发现Dataflow SDK 1.9和2.0 / 2.1之间存在一个非常奇怪的区别。

We have CoGroupByKey step that joins two PCollections by their keys and outputs two PCollections (via TupleTags). For instance, one PCollection may contain {"str1", "str2"} and the other may contains {"str3"}.

我们有CoGroupByKey步骤,它通过键连接两个PCollections并输出两个PCollections(通过TupleTags)。例如,一个PCollection可以包含{“str1”,“str2”},另一个可以包含{“str3”}。

These two PCollections are written to GCS (at different locations), and their union (basically, the PCollection produced by applying Flatten on the two PCollections) would be used by subsequent steps in a pipeline. Using the previous example, we will store {"str1", "str2"} and {"str3"} in GCS under respective locations, and the pipeline will further transform their union (Flattened PCollection) {"str1", "str2", "str3"}, and so on.

这两个PCollections被写入GCS(在不同的位置),它们的并集(基本上,通过在两个PCollections上应用Flatten产生的PCollection)将被管道中的后续步骤使用。使用前面的示例,我们将在GCS中将{“str1”,“str2”}和{“str3”}存储在相应位置下,并且管道将进一步转换它们的并集(Flattened PCollection){“str1”,“str2”, “str3”},等等。

In Dataflow SDK 1.9, that is exactly what is happening, and we've built our pipelines around this logic. As we were slowly migrating to 2.0/2.1, we noticed that this behavior is no longer observed. Instead, all the steps followed by the Flatten step are run correctly and as expected, but those two PCollections (being Flattened) are no longer written to GCS as if they are nonexistent. In the execution graph though, the steps are shown, and this is very strange to us.

在Dataflow SDK 1.9中,这正是正在发生的事情,我们围绕这个逻辑构建了我们的管道。当我们慢慢迁移到2.0 / 2.1时,我们注意到不再观察到这种行为。相反,Flatten步骤所遵循的所有步骤都正确运行并按预期运行,但是这两个PCollections(被展平)不再写入GCS,就好像它们不存在一样。但是在执行图中,显示了步骤,这对我们来说非常奇怪。

We were able to reproduce this issue reliably so that we can share the data and code as an example. We have two text files stored in GCS:

我们能够可靠地重现此问题,以便我们可以共享数据和代码作为示例。我们在GCS中存储了两个文本文件:

data1.txt:

DATA1.TXT:

k1,v1
k2,v2

data2.txt:

data2.txt:

k2,w2
k3,w3

We will read these two files to create two PCollections, a PC for each file. We'll parse each line to create KV<String, String> (so the keys are k1, k2, k3 in this example).

我们将读取这两个文件来创建两个PCollections,每个文件都有一台PC。我们将解析每一行以创建KV (因此在此示例中键为k1,k2,k3)。 ,string>

We then apply CoGroupByKey and produce PCollection to be output to GCS. Two PCollections are produced after the CoGroupByKey step depending on the number of values associated with each key (it's a contrived example, but it is to demonstrate the issue we are experiencing) -- whether the number is even or odd. So one of the PCs will contain keys "k1, " and "k3" (with some value strings appended to them, see the code below) as they have one value each and the other will contain a single key "k2" as it has two values (found in each file).

然后我们应用CoGroupByKey并生成PCollection以输出到GCS。在CoGroupByKey步骤之后产生两个PCollections,具体取决于与每个键相关联的值的数量(这是一个人为的例子,但它是为了证明我们遇到的问题) - 数字是偶数还是奇数。因此,其中一台PC将包含键“k1”和“k3”(附加一些值字符串,请参阅下面的代码),因为它们各有一个值,另一个将包含单个键“k2”,因为它具有两个值(在每个文件中找到)。

These two PCs are written to GCS at different locations, and the flattened PC of the two will also be written to GCS (but it could have been further transformed).

这两台PC在不同的位置被写入GCS,两者的扁平PC也将被写入GCS(但它可以进一步转换)。

The three output files are expected to contain the following contents (rows may not be in order):

三个输出文件应包含以下内容(行可能不是按顺序排列):

output1:

输出1:

k2: [v2],(w2)

output2:

输出2:

k3: (w3)
k1: [v1]

outputMerged:

outputMerged:

k3: (w3)
k2: [v2],(w2)
k1: [v1]

This is exactly what we see (and expected)in Dataflow SDK 1.9.

这正是我们在Dataflow SDK 1.9中看到(和预期)的内容。

In 2.0 and 2.1 however, output1 and output2 come out to be empty (and the TextIO steps are not even executed as if there are no elements being input to them; we verified this by adding a dummy ParDo in-between, and it's not invoked at all).

然而,在2.0和2.1中,output1和output2变为空(并且TextIO步骤甚至没有执行,好像没有元素输入到它们;我们通过在中间添加一个伪ParDo来验证这一点,并且它没有被调用在所有)。

This makes us very curious as to why suddenly this behavior change was made between 1.9 and 2.0/2.1, and what would be the best way for us to achieve what we have been doing with 1.9. Specifically, we produce output1/2 for archiving purposes, while we flatten the two PCs to transform the data further and produce another output.

这让我们非常好奇为什么突然间这种行为改变是在1.9和2.0 / 2.1之间进行的,这对我们实现1.9所做的最好的方式是什么。具体来说,我们为归档目的生成输出1/2,而我们将两台PC压平以进一步转换数据并产生另一个输出。

Here is Java Code you can run (you will have to import properly, change the bucket name, and set Options properly, etc.).

这是您可以运行的Java代码(您必须正确导入,更改存储桶名称,并正确设置选项等)。

Working code for 1.9:

1.9的工作代码:

//Dataflow SDK 1.9 compatible.
public class TestJob {
  public static void execute(Options options) {
    Pipeline pipeline = Pipeline.create(options);
    PCollection<KV<String, String>> data1 =
        pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));

    PCollection<KV<String, String>> data2 =
        pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));

    TupleTag<String> inputTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> inputTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    TupleTag<String> outputTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> outputTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
        .apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
            .withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
    PCollection<String> output1 = tuple.get(outputTag1);
    PCollection<String> output2 = tuple.get(outputTag2);
    PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());

    outputMerged.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/outputMerged").withNumShards(1));
    output1.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output1").withNumShards(1));
    output2.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output2").withNumShards(1));

    pipeline.run();
  }

  static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
    private static final long serialVersionUID = 1L;

    final TupleTag<String> inputTag1;
    final TupleTag<String> inputTag2;
    final TupleTag<String> outputTag2;

    public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
      this.inputTag1 = inputTag1;
      this.inputTag2 = inputTag2;
      this.outputTag2 = outputTag2;
    }

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      String key = c.element().getKey();
      List<String> values = new ArrayList<String>();
      int numValues = 0;
      for (String val1 : c.element().getValue().getAll(inputTag1)) {
        values.add(String.format("[%s]", val1));
        numValues++;
      }
      for (String val2 : c.element().getValue().getAll(inputTag2)) {
        values.add(String.format("(%s)", val2));
        numValues++;
      }
      final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
      if (numValues % 2 == 0) {
        c.output(line);
      } else {
        c.sideOutput(outputTag2, line);
      }
    }
  }

  static class doFn extends DoFn<String, KV<String, String>> {
    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      String[] tokens = c.element().split(",");
      c.output(KV.of(tokens[0], tokens[1]));
    }
  }
}

Working Code for 2.0/2.1:

2.0 / 2.1的工作代码:

// Dataflow SDK 2.0 and 2.1 compatible.
public class TestJob {
  public static void execute(Options options) {
    Pipeline pipeline = Pipeline.create(options);
    PCollection<KV<String, String>> data1 =
        pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));

    PCollection<KV<String, String>> data2 =
        pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));

    TupleTag<String> inputTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> inputTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    TupleTag<String> outputTag1 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };
    TupleTag<String> outputTag2 = new TupleTag<String>() {
      private static final long serialVersionUID = 1L;
    };

    PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
        .apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
            .withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
    PCollection<String> output1 = tuple.get(outputTag1);
    PCollection<String> output2 = tuple.get(outputTag2);
    PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());

    outputMerged.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/outputMerged").withNumShards(1));
    output1.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output1").withNumShards(1));
    output2.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output2").withNumShards(1));

    PipelineResult pipelineResult = pipeline.run();
    pipelineResult.waitUntilFinish();

  }

  static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
    private static final long serialVersionUID = 1L;

    final TupleTag<String> inputTag1;
    final TupleTag<String> inputTag2;
    final TupleTag<String> outputTag2;

    public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
      this.inputTag1 = inputTag1;
      this.inputTag2 = inputTag2;
      this.outputTag2 = outputTag2;
    }

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      String key = c.element().getKey();
      List<String> values = new ArrayList<String>();
      int numValues = 0;
      for (String val1 : c.element().getValue().getAll(inputTag1)) {
        values.add(String.format("[%s]", val1));
        numValues++;
      }
      for (String val2 : c.element().getValue().getAll(inputTag2)) {
        values.add(String.format("(%s)", val2));
        numValues++;
      }
      final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
      if (numValues % 2 == 0) {
        c.output(line);
      } else {
        c.output(outputTag2, line);
      }
    }
  }

  static class doFn extends DoFn<String, KV<String, String>> {
    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      String[] tokens = c.element().split(",");
      c.output(KV.of(tokens[0], tokens[1]));
    }
  }
}

Also, in case it is useful, the execution graph looks like this. (And for Google engineers, Job IDs are also specified).

此外,如果有用,执行图如下所示。 (对于Google工程师,还指定了作业ID)。

With 1.9 (job id 2017-09-29_14_35_42-15149127992051688457): 意外的行为从数据流1.9更改为2.0 / 2.1

1.9(职位编号2017-09-29_14_35_42-15149127992051688457):

With 2.1 (job id 2017-09-29_14_31_59-991964669451027883): 意外的行为从数据流1.9更改为2.0 / 2.1

随着2.1(工作编号2017-09-29_14_31_59-991964669451027883):

TextIO.Write 2,3 are not producing any output under 2.0/2.1. Flatten, and its subsequent step works fine.

TextIO.Write 2,3不会在2.0 / 2.1下产生任何输出。展平,其后续步骤正常。

1 个解决方案

#1


3  

This is indeed a defect. A fix is in flight and should be documented as available in the Service Release Notes.

这确实是一个缺陷。修复程序正在进行中,应记录在“服务发行说明”中。

A workaround in the meantime is to use the 1.9.1 SDK, as this error only affects 2.x SDKs.

同时解决方法是使用1.9.1 SDK,因为此错误仅影响2.x SDK。

Users interested in picking up the fix early can also use the latest nightly build from Beam (recommended to unblock development, not for production, since it's a daily build). Instructions here.

有兴趣尽早获得修复的用户也可以使用Beam的最新夜间版本(建议解锁开发,而不是生产,因为它是每日构建)。说明在这里。

#1


3  

This is indeed a defect. A fix is in flight and should be documented as available in the Service Release Notes.

这确实是一个缺陷。修复程序正在进行中,应记录在“服务发行说明”中。

A workaround in the meantime is to use the 1.9.1 SDK, as this error only affects 2.x SDKs.

同时解决方法是使用1.9.1 SDK,因为此错误仅影响2.x SDK。

Users interested in picking up the fix early can also use the latest nightly build from Beam (recommended to unblock development, not for production, since it's a daily build). Instructions here.

有兴趣尽早获得修复的用户也可以使用Beam的最新夜间版本(建议解锁开发,而不是生产,因为它是每日构建)。说明在这里。