如何在执行相同的Dataflow管道期间向BigQuery写入计算的模式?

时间:2022-05-09 14:38:56

My scenario is a variation on the one discussed here: How do I write to BigQuery using a schema computed during Dataflow execution?

我的场景是这里讨论的一个变体:如何使用在Dataflow执行期间计算的模式写入BigQuery?

In this case, the goal is that same (read a schema during execution, then write a table with that schema to BigQuery), but I want to accomplish it within a single pipeline.

在这种情况下,目标是相同的(在执行期间读取模式,然后将具有该模式的表写入BigQuery),但我想在单个管道中完成它。

For example, I'd like to write a CSV file to BigQuery and avoid fetching the file twice (once to read schema, once to read data).

例如,我想将一个CSV文件写入BigQuery并避免两次获取文件(一次读取模式,一次读取数据)。

Is this possible? If so, what's the best approach?

这可能吗?如果是这样,最好的方法是什么?


My current best guess is to read the schema into a PCollection via a side output and then use that to create the table (with a custom PTransform) before passing the data to BigQueryIO.Write.

我目前最好的猜测是通过侧输出将模式读入PCollection,然后在将数据传递给BigQueryIO.Write之前使用它来创建表(使用自定义PTransform)。

1 个解决方案

#1


1  

If you use BigQuery.Write to create the table then the schema needs to known when the table is created.

如果使用BigQuery.Write创建表,则在创建表时需要知道模式。

Your proposed solution of not specifying the schema when you create the BigQuery.Write transform might work, but you might get an error because the table doesn't exist and you aren't configuring BigQueryIO.Write to create it if needed.

您在创建BigQuery.Write转换时未提供架构的建议解决方案可能有效,但您可能会收到错误,因为该表不存在,并且您没有配置BigQueryIO.Write以在需要时创建它。

You might want to consider reading just enough of your CSV files in your main program to determine the schema before running your pipeline. This would avoid the complexity of determining the schema at runtime. You would still incur the cost of the extra read but hopefully that's minimal.

您可能需要考虑在主程序中读取足够多的CSV文件,以便在运行管道之前确定架构。这将避免在运行时确定模式的复杂性。你仍会承担额外阅读的费用,但希望这是最小的。

Alternatively you create a custom sink to write your data to BigQuery. Your Sinks could write the data to GCS. Your finalize method could then create a BigQuery load job. Your custom sink could infer the schema by looking at the records and create the BigQuery table with the appropriate schema.

或者,您可以创建自定义接收器以将数据写入BigQuery。您的接收器可以将数据写入GCS。然后,您的finalize方法可以创建BigQuery加载作业。您的自定义接收器可以通过查看记录并使用适当的架构创建BigQuery表来推断架构。

#1


1  

If you use BigQuery.Write to create the table then the schema needs to known when the table is created.

如果使用BigQuery.Write创建表,则在创建表时需要知道模式。

Your proposed solution of not specifying the schema when you create the BigQuery.Write transform might work, but you might get an error because the table doesn't exist and you aren't configuring BigQueryIO.Write to create it if needed.

您在创建BigQuery.Write转换时未提供架构的建议解决方案可能有效,但您可能会收到错误,因为该表不存在,并且您没有配置BigQueryIO.Write以在需要时创建它。

You might want to consider reading just enough of your CSV files in your main program to determine the schema before running your pipeline. This would avoid the complexity of determining the schema at runtime. You would still incur the cost of the extra read but hopefully that's minimal.

您可能需要考虑在主程序中读取足够多的CSV文件,以便在运行管道之前确定架构。这将避免在运行时确定模式的复杂性。你仍会承担额外阅读的费用,但希望这是最小的。

Alternatively you create a custom sink to write your data to BigQuery. Your Sinks could write the data to GCS. Your finalize method could then create a BigQuery load job. Your custom sink could infer the schema by looking at the records and create the BigQuery table with the appropriate schema.

或者,您可以创建自定义接收器以将数据写入BigQuery。您的接收器可以将数据写入GCS。然后,您的finalize方法可以创建BigQuery加载作业。您的自定义接收器可以通过查看记录并使用适当的架构创建BigQuery表来推断架构。