Apache Beam中有状态处理的问题

时间:2022-02-17 15:18:44

So I've read both beam's stateful processing and timely processing articles and had found issues implementing the functions per se.

所以我读过梁的状态处理和及时处理文章,并发现了实现这些功能的问题。

The problem I am trying to solve is something similar to this to generate a sequential index for every line. Since I do wish to be able to reference the line produced by dataflow to that of the original source.

我试图解决的问题与此类似,为每一行生成一个顺序索引。因为我希望能够将数据流生成的行引用到原始源的行。

public static class createIndex extends DoFn<String, KV<String, String>> {
  @StateId("count")
  private final StateSpec<ValueState<Long>> countState = StateSpecs.value(VarLongCoder.of());

  @ProcessElement
  public void processElement(ProcessContext c, @StateId("count") ValueState<Long> countState)  {

    String val = c.element();
    long count = 0L;
    if(countState.read() != null)
      count = countState.read();

    count = count + 1;
    countState.write(count);

    c.output(KV.of(String.valueOf(count), val));

  }
}

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
 .apply(ParDo.of(new createIndex()));

I followed whatever I could find online and looked at the raw source code of ParDo and was not sure what needed to be done. The Error I am getting is:

我按照我在网上找到的任何内容查看了ParDo的原始源代码,但不确定需要做什么。我得到的错误是:

java.lang.IllegalArgumentException: ParDo requires its input to use KvCoder in order to use state and timers.

I looked up examples here and here.

我在这里和这里查看了示例。

I realize this is a simple problem but due to lack of sufficient examples or documentation, I was unable to fix the problem. I'd appreciate any help. Thanks!

我意识到这是一个简单的问题,但由于缺乏足够的示例或文档,我无法解决问题。我很感激任何帮助。谢谢!

1 个解决方案

#1


3  

Ok, so I continued working on the problem and reading some source code and was able to resolve the problem. It turns out that the input for the ParDo.of(new DoFn()) requires the input coming in to be a KV<T,T>. Therefore in order to read the file and create an index for each line, I need to pass it through a Key Value Pair object. Below I added the code:

好的,所以我继续研究这个问题并阅读一些源代码,并且能够解决问题。事实证明,ParDo.of(new DoFn())的输入要求输入为KV 。因此,为了读取文件并为每一行创建索引,我需要通过Key Value Pair对象传递它。下面我添加了代码: ,t>

public static class FakeKvPair extends DoFn<String, KV<String, String>> {     
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(KV.of("", c.element()));
  }
}

And changed the pipeline to:

并将管道更改为:

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
 .apply(ParDo.of(new FakeKvPair()))
 .apply(ParDo.of(new createIndex()));

The new problem that arises is whether the order of the lines are preserved since in my count since I am running through an extra ParDo function that might potentially change up the order of the lines being fed to createIndex().

出现的新问题是,在我的计数中是否保留了行的顺序,因为我正在运行额外的ParDo函数,该函数可能会改变输入到createIndex()的行的顺序。

On my local machine order is preserved, but I don't know how that will scale to Dataflow. But I will ask that as a different question.

在我的本地机器上保留订单,但我不知道它将如何扩展到Dataflow。但我会问这是一个不同的问题。

#1


3  

Ok, so I continued working on the problem and reading some source code and was able to resolve the problem. It turns out that the input for the ParDo.of(new DoFn()) requires the input coming in to be a KV<T,T>. Therefore in order to read the file and create an index for each line, I need to pass it through a Key Value Pair object. Below I added the code:

好的,所以我继续研究这个问题并阅读一些源代码,并且能够解决问题。事实证明,ParDo.of(new DoFn())的输入要求输入为KV 。因此,为了读取文件并为每一行创建索引,我需要通过Key Value Pair对象传递它。下面我添加了代码: ,t>

public static class FakeKvPair extends DoFn<String, KV<String, String>> {     
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(KV.of("", c.element()));
  }
}

And changed the pipeline to:

并将管道更改为:

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
 .apply(ParDo.of(new FakeKvPair()))
 .apply(ParDo.of(new createIndex()));

The new problem that arises is whether the order of the lines are preserved since in my count since I am running through an extra ParDo function that might potentially change up the order of the lines being fed to createIndex().

出现的新问题是,在我的计数中是否保留了行的顺序,因为我正在运行额外的ParDo函数,该函数可能会改变输入到createIndex()的行的顺序。

On my local machine order is preserved, but I don't know how that will scale to Dataflow. But I will ask that as a different question.

在我的本地机器上保留订单,但我不知道它将如何扩展到Dataflow。但我会问这是一个不同的问题。