编写表后的Apache Beam Pipeline查询表

时间:2022-01-23 15:36:44

I have a Apache Beam/Dataflow pipeline that is writing results to a BigQuery table. I would then like to query this table for a separate portion of the pipeline. However, I can't seem to figure out how to properly set up this pipeline dependency. The new table that I write (and then want to query) is left joined with a separate table for some filtering logic and that is why I actually need to write the table and then run the query. The logic would be as follows:

我有一个Apache Beam / Dataflow管道,它将结果写入BigQuery表。然后,我想查询此表以获取管道的单独部分。但是,我似乎无法弄清楚如何正确设置此管道依赖项。我编写的新表(然后想要查询)与一个单独的表连接,用于某些过滤逻辑,这就是我实际需要编写表然后运行查询的原因。逻辑如下:

with beam.Pipeline(options=pipeline_options) as p:
    table_data = p | 'CreatTable' >> # ... logic to generate table ...

    # Write Table to BQ
    table_written = table_data | 'WriteTempTrainDataBQ' >> beam.io.WriteToBigQuery(...)

    query_results = table_written | 'QueryNewTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_new_table))

if query_new_table is actually a query of an already existing BQ table and I change to query_results = p | instead of table_written this works properly. However, if I try to query the table that I am writing in the middle of the pipeline then I cannot get the pipeline step to "wait" until that table has actually been generated. Is there any way to do this that I am overlooking?

如果query_new_table实际上是对现有BQ表的查询,我将更改为query_results = p |而不是table_written这适用。但是,如果我尝试查询我正在管道中间写的表,那么我无法让管道步骤“等待”直到该表实际生成。我有什么方法可以忽略这个吗?

When I try to make this step sequential, I am getting an assertion error assert isinstance(pbegin, pvalue.PBegin) AssertionError which I am reading to mean that table_written is the issue as it is not a valid PCollection instance.

当我尝试按顺序执行此步骤时,我收到一个断言错误断言isinstance(pbegin,pvalue.PBegin)我正在读取的AssertionError意味着table_written是问题,因为它不是有效的PCollection实例。

Does anybody know what I would could put in place of table_written to make this actually run sequentially as desired?

有没有人知道我可以用什么代替table_written来使它实际按顺序运行?

1 个解决方案

#1


2  

The use case "do something after a BigQuery write is complete" is not supported by Beam currently. The only workaround is to run separate pipelines: have your main program be: run the pipeline that writes to BigQuery; wait for the pipeline to finish; run another pipeline that reads from BigQuery.

Beam目前不支持用例“在BigQuery写入完成后执行某些操作”。唯一的解决方法是运行单独的管道:让你的主程序是:运行写入BigQuery的管道;等待管道完成;运行另一个从BigQuery读取的管道。

This is a very frequently requested feature and we're beginning to design this support (more generally, upgrading various IO writes to support sequencing operations after them), but I don't know when we'll be done.

这是一个非常频繁要求的功能,我们开始设计这种支持(更一般地,升级各种IO写入以支持它们之后的排序操作),但我不知道什么时候完成。

#1


2  

The use case "do something after a BigQuery write is complete" is not supported by Beam currently. The only workaround is to run separate pipelines: have your main program be: run the pipeline that writes to BigQuery; wait for the pipeline to finish; run another pipeline that reads from BigQuery.

Beam目前不支持用例“在BigQuery写入完成后执行某些操作”。唯一的解决方法是运行单独的管道:让你的主程序是:运行写入BigQuery的管道;等待管道完成;运行另一个从BigQuery读取的管道。

This is a very frequently requested feature and we're beginning to design this support (more generally, upgrading various IO writes to support sequencing operations after them), but I don't know when we'll be done.

这是一个非常频繁要求的功能,我们开始设计这种支持(更一般地,升级各种IO写入以支持它们之后的排序操作),但我不知道什么时候完成。