如何使用Python使用Google Pub / Sub和Google Dataflow / Beam?

时间:2023-02-09 15:19:33

I'm new to Pub/Sub and Dataflow/Beam. I have done a task in Spark and Kafka, I want to do the same using Pub/Sub and Dataflow/Beam. From what I understood so far Kafka is similar to Pub/Sub and Spark is similar to Dataflow/Beam.

我是Pub / Sub和Dataflow / Beam的新手。我在Spark和Kafka完成了一项任务,我想使用Pub / Sub和Dataflow / Beam做同样的事情。据我所知,到目前为止,Kafka与Pub / Sub类似,Spark类似于Dataflow / Beam。

The task is take a JSON file and write to a Pub/Sub topic. Then using Beam/Dataflow I need to get that data into a PCollection. How will I achieve this?

任务是获取JSON文件并写入Pub / Sub主题。然后使用Beam / Dataflow我需要将数据输入PCollection。我将如何实现这一目标?

2 个解决方案

#1


2  

I solved the above problem. I'm able to continuously read data from a pubsub topic and then do some processing and then write the result to a datastore.

我解决了上面的问题。我能够连续读取pubsub主题中的数据,然后进行一些处理,然后将结果写入数据存储区。

with beam.Pipeline(options=options) as p:

    # Read from PubSub into a PCollection.
    lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)

    # Group and aggregate each JSON object.
    transformed = (lines
                   | 'Split' >> beam.FlatMap(lambda x: x.split("\n"))
                   | 'jsonParse' >> beam.ParDo(jsonParse())
                   | beam.WindowInto(window.FixedWindows(15,0))
                   | 'Combine' >> beam.CombinePerKey(sum))

    # Create Entity.
    transformed = transformed | 'create entity' >> beam.Map(
      EntityWrapper(config.NAMESPACE, config.KIND, config.ANCESTOR).make_entity)

    # Write to Datastore.
    transformed | 'write to datastore' >> WriteToDatastore(known_args.dataset_id)

#2


1  

Pubsub is a streaming source/ sink (it doesn't make sense to read/write to it only once). Dataflow python SDK support for streaming is not yet available.

Pubsub是一个流媒体源/接收器(只读一次它没有意义)。 Dataflow python SDK支持流媒体尚不可用。

Documentation: https://cloud.google.com/dataflow/release-notes/release-notes-python.

文档:https://cloud.google.com/dataflow/release-notes/release-notes-python。

Once streaming is available, you should be able to do this pretty trivially.

一旦流式传输可用,您应该可以非常简单地完成此操作。

However if you are going from file -> pubsub and then pubsub -> pcollection you should be able to do this with a batch pipeline and drop out the pubsub aspect. You can look at the basic file io for beam.

但是,如果你要从文件 - > pubsub然后发布pubsub - > pcollection,你应该可以使用批处理管道执行此操作并删除pubsub方面。您可以查看梁的基本文件io。

#1


2  

I solved the above problem. I'm able to continuously read data from a pubsub topic and then do some processing and then write the result to a datastore.

我解决了上面的问题。我能够连续读取pubsub主题中的数据,然后进行一些处理,然后将结果写入数据存储区。

with beam.Pipeline(options=options) as p:

    # Read from PubSub into a PCollection.
    lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)

    # Group and aggregate each JSON object.
    transformed = (lines
                   | 'Split' >> beam.FlatMap(lambda x: x.split("\n"))
                   | 'jsonParse' >> beam.ParDo(jsonParse())
                   | beam.WindowInto(window.FixedWindows(15,0))
                   | 'Combine' >> beam.CombinePerKey(sum))

    # Create Entity.
    transformed = transformed | 'create entity' >> beam.Map(
      EntityWrapper(config.NAMESPACE, config.KIND, config.ANCESTOR).make_entity)

    # Write to Datastore.
    transformed | 'write to datastore' >> WriteToDatastore(known_args.dataset_id)

#2


1  

Pubsub is a streaming source/ sink (it doesn't make sense to read/write to it only once). Dataflow python SDK support for streaming is not yet available.

Pubsub是一个流媒体源/接收器(只读一次它没有意义)。 Dataflow python SDK支持流媒体尚不可用。

Documentation: https://cloud.google.com/dataflow/release-notes/release-notes-python.

文档:https://cloud.google.com/dataflow/release-notes/release-notes-python。

Once streaming is available, you should be able to do this pretty trivially.

一旦流式传输可用,您应该可以非常简单地完成此操作。

However if you are going from file -> pubsub and then pubsub -> pcollection you should be able to do this with a batch pipeline and drop out the pubsub aspect. You can look at the basic file io for beam.

但是,如果你要从文件 - > pubsub然后发布pubsub - > pcollection,你应该可以使用批处理管道执行此操作并删除pubsub方面。您可以查看梁的基本文件io。