如何在apache梁中展平窗口的pcollections? [云数据流]

时间:2021-11-23 14:40:20

I try to stream data from pubsub to Datastore using dataflow. I searched google provided template. https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/src/main/java/com/google/cloud/teleport/templates

我尝试使用数据流将数据从pubsub流式传输到Datastore。我搜索了谷歌提供的模板。 https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/src/main/java/com/google/cloud/teleport/templates

And notice the PubsubToDatastore doesn't work. So, I try to debug this. https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToDatastore.java

请注意PubsubToDatastore不起作用。所以,我尝试调试这个。 https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToDatastore.java

here is what I done.

这就是我所做的。

  • add errorTag
  • 添加errorTag
  • add window processing(pubsub generate unbounded data and datastore can't accept unbounded data)
  • 添加窗口处理(pubsub生成*数据,数据存储无法接受*数据)
  • add flatten(the method of write windowed data to datastore is none. so, I think unwindowed.)
  • 添加flatten(将窗口化数据写入数据存储区的方法是none。所以,我认为unwindowed。)

here is my code.

这是我的代码。

    package com.google.cloud.teleport.templates;

    import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreWriteOptions;
    import com.google.cloud.teleport.templates.common.DatastoreConverters.WriteJsonEntities;
    import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
    import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
    import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubReadOptions;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;

    // added for errorTag
    import com.google.cloud.teleport.templates.common.ErrorConverters.ErrorWriteOptions;
    import com.google.cloud.teleport.templates.common.ErrorConverters.LogErrors;
    import org.apache.beam.sdk.values.TupleTag;

    // added for window
    import org.apache.beam.sdk.transforms.windowing.FixedWindows;
    import org.apache.beam.sdk.transforms.windowing.Window;
    import org.apache.beam.sdk.transforms.Flatten;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionList;
    import org.apache.beam.sdk.values.PCollectionTuple;

    import org.joda.time.Duration;

    public class PubsubToDatastore {
      interface PubsubToDatastoreOptions extends
          PipelineOptions,
          PubsubReadOptions,
          JavascriptTextTransformerOptions,
          DatastoreWriteOptions,
          ErrorWriteOptions {} // added

      public static void main(String[] args) {
        PubsubToDatastoreOptions options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(PubsubToDatastoreOptions.class);

        TupleTag<String> errorTag = new TupleTag<String>("errors"){};

        Pipeline pipeline = Pipeline.create(options);

        pipeline
            .apply("Read Pubsub Events", PubsubIO.readStrings().fromTopic(options.getPubsubReadTopic()))
            .apply("Windowing", Window.into(FixedWindows.of(Duration.standardMinutes(5))))
            .apply("Flatten", Flatten.pCollections())
            .apply("Transform text to json", TransformTextViaJavascript.newBuilder()
                .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                .setFunctionName(options.getJavascriptTextTransformFunctionName())
                .build())
            .apply(WriteJsonEntities.newBuilder()
                .setProjectId(options.getDatastoreWriteProjectId())
                .setErrorTag(errorTag)
                .build())
            .apply(LogErrors.newBuilder()
                .setErrorWritePath(options.getErrorWritePath())
                .setErrorTag(errorTag)
                .build());

        pipeline.run();
      }
    } 

when I run this code, the error occured.

当我运行此代码时,出现错误。

    [INFO] BUILD FAILURE
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 11.054 s
    [INFO] Finished at: 2018-08-20T17:55:49+09:00
    [INFO] ------------------------------------------------------------------------
    [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.6.2:compile (default-compile) on project google-cloud-teleport-java: Compilation failure
    [ERROR] /Users/shinya.yaginuma/work/DataflowTemplates/src/main/java/com/google/cloud/teleport/templates/PubsubToDatastore.java:[80,9] can not find an appropriate method for apply(java.lang.String,org.apache.beam.sdk.transforms.Flatten.PCollections<java.lang.Object>)
    [ERROR]     method org.apache.beam.sdk.values.PCollection.<OutputT>apply(org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<java.lang.String>,OutputT>) can't use
    [ERROR]       (Unable to infer the type variable OutputT
    [ERROR]         (The actual argument list and dummy argument list have different lengths))
    [ERROR]     method org.apache.beam.sdk.values.PCollection.<OutputT>apply(java.lang.String,org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<java.lang.String>,OutputT>) can't use
    [ERROR]       (Since there is no instance of type variable T, org.apache.beam.sdk.transforms.Flatten.PCollections is not fit for  org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<java.lang.String>,OutputT>)

what can I do next? Please give me advice. Regards.

我接下来该怎么办?请给我建议。问候。

1 个解决方案

#1


1  

Not sure why you want to Flatten the collection after the Windowing. It am guessing the Flatten operation doesn't do really what you think it does.

不确定为什么要在Windowing之后展平集合。我猜测Flatten操作并不能真正实现您的想法。

Here is what it says it does:

以下是它所说的:

Returns a {@link PTransform} that flattens a {@link PCollectionList} into a {@link PCollection} containing all the elements of all the {@link PCollection}s in its input.

返回{@link PTransform},将{@link PCollectionList}展平为{@link PCollection},其中包含其输入中所有{@link PCollection}的所有元素。

Flatten takes multiple PCollections bundled into a PCollectionList and returns a single PCollection containing all the elements in all the input PCollections. The name "Flatten" suggests taking a list of lists and flattening them into a single list.

Flatten将多个PCollections捆绑到PCollectionList中,并返回包含所有输入PCollections中所有元素的单个PCollection。名称“Flatten”建议列出一个列表并将它们展平为一个列表。

If you have multiple PCollections from different sources for instance and you wanted to "flatten" that into the same PCollection, then Flatten is your tool. In this scenario you only have a PCollection (not a PCollectionList, i.e. a list of PCollections) so the Flatten operation won't do you any good. The first step gives you a PCollection<String> from the PubSubIO.readStrings(), the windowing Window.into(...) then gives you a bounded PCollection<String> from that first unbounded PCollection<String>.

例如,如果您有来自不同来源的多个PCollections,并且您希望将其“展平”到同一PCollection中,那么Flatten就是您的工具。在这种情况下,您只有PCollection(不是PCollectionList,即PCollections列表),因此Flatten操作对您没有任何好处。第一步为您提供来自PubSubIO.readStrings()的PCollection ,窗口Window.into(...)然后从第一个*PCollection 中为您提供有界PCollection

I suggest you simply remove the .apply("Flatten", Flatten.pCollections()) line and run your pipeline again. It looks fine otherwise.

我建议您只需删除.apply(“Flatten”,Flatten.pCollections())行并再次运行您的管道。看起来不错。

#1


1  

Not sure why you want to Flatten the collection after the Windowing. It am guessing the Flatten operation doesn't do really what you think it does.

不确定为什么要在Windowing之后展平集合。我猜测Flatten操作并不能真正实现您的想法。

Here is what it says it does:

以下是它所说的:

Returns a {@link PTransform} that flattens a {@link PCollectionList} into a {@link PCollection} containing all the elements of all the {@link PCollection}s in its input.

返回{@link PTransform},将{@link PCollectionList}展平为{@link PCollection},其中包含其输入中所有{@link PCollection}的所有元素。

Flatten takes multiple PCollections bundled into a PCollectionList and returns a single PCollection containing all the elements in all the input PCollections. The name "Flatten" suggests taking a list of lists and flattening them into a single list.

Flatten将多个PCollections捆绑到PCollectionList中,并返回包含所有输入PCollections中所有元素的单个PCollection。名称“Flatten”建议列出一个列表并将它们展平为一个列表。

If you have multiple PCollections from different sources for instance and you wanted to "flatten" that into the same PCollection, then Flatten is your tool. In this scenario you only have a PCollection (not a PCollectionList, i.e. a list of PCollections) so the Flatten operation won't do you any good. The first step gives you a PCollection<String> from the PubSubIO.readStrings(), the windowing Window.into(...) then gives you a bounded PCollection<String> from that first unbounded PCollection<String>.

例如,如果您有来自不同来源的多个PCollections,并且您希望将其“展平”到同一PCollection中,那么Flatten就是您的工具。在这种情况下,您只有PCollection(不是PCollectionList,即PCollections列表),因此Flatten操作对您没有任何好处。第一步为您提供来自PubSubIO.readStrings()的PCollection ,窗口Window.into(...)然后从第一个*PCollection 中为您提供有界PCollection

I suggest you simply remove the .apply("Flatten", Flatten.pCollections()) line and run your pipeline again. It looks fine otherwise.

我建议您只需删除.apply(“Flatten”,Flatten.pCollections())行并再次运行您的管道。看起来不错。