在写入BigQuery Apache beam v2.0时,GroupByKey中的元素累积子任务

时间:2022-05-09 14:38:50

I'm implementing a Dataflow pipeline that reads messages from Pubsub and writes TableRows into BigQuery (BQ) using Apache Beam SDK 2.0.0 for Java.

我正在实现一个Dataflow管道,它从Pubsub读取消息,并使用Apache Beam SDK 2.0.0 for Java将TableRows写入BigQuery(BQ)。

This is the related portion of code:

这是代码的相关部分:

 tableRowPCollection
            .apply(BigQueryIO.writeTableRows().to(this.tableId)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

This code generates a group of tasks under the hood in the Dataflow pipeline. One of these tasks is the GroupByKey. This task is accumulating elements in the pipeline as can be seen in this print screen: GBK elements accumulation image. After reading the docs I suspect that this issue relates to the Window config. But I could not find a way of modifying the Window configuration since it is implicitly created by Window.Assign transform inside the Reshuffle task.

此代码在Dataflow管道中生成一组任务。其中一个任务是GroupByKey。此任务是在管道中累积元素,如此打印屏幕中所示:GBK元素累积图像。阅读文档后,我怀疑这个问题与Window配置有关。但我无法找到修改Window配置的方法,因为它是由Reshuffle任务中的Window.Assign变换隐式创建的。

Is there a way of setting the window parameters and/or attaching triggers to this implicit Window or should I create my own DoFn that inserts a TableRow in BQ?

有没有办法设置窗口参数和/或将触发器附加到这个隐式窗口,还是应该创建自己的在BQ中插入TableRow的DoFn?

Thanks in advance!

提前致谢!


[Update]

[更新]

I left the pipeline running for a day approximately and after that the GroupByKey subtask became faster and the number of elements coming in and coming out approximated to each other (sometimes were the same). Furthermore, I also noticed that the Watermark got closer to the current date and was increasing faster. So the "issue" was solved.

我离开管道大约运行一天,然后GroupByKey子任务变得更快,进出的元素数量相互接近(有时是相同的)。此外,我还注意到Watermark更接近当前日期并且增长更快。所以“问题”就解决了。

1 个解决方案

#1


0  

There isn't any waiting introduced by the Reshuffle in the BigQuery sink. Rather, it is used to create the batches for of rows to write to BigQuery. The number of elements coming out of the GroupByKey is smaller because each output element represents a batch (or group) of input elements.

BigQuery接收器中的重新洗牌没有引入任何等待。相反,它用于创建要写入BigQuery的行的批处理。来自GroupByKey的元素数量较小,因为每个输出元素代表一批(或一组)输入元素。

You should be able to see the total number of elements coming out as the output of the ExpandIterable (the output of the Reshuffle).

您应该能够看到作为ExpandIterable输出的元素总数(重新洗牌的输出)。

#1


0  

There isn't any waiting introduced by the Reshuffle in the BigQuery sink. Rather, it is used to create the batches for of rows to write to BigQuery. The number of elements coming out of the GroupByKey is smaller because each output element represents a batch (or group) of input elements.

BigQuery接收器中的重新洗牌没有引入任何等待。相反,它用于创建要写入BigQuery的行的批处理。来自GroupByKey的元素数量较小,因为每个输出元素代表一批(或一组)输入元素。

You should be able to see the total number of elements coming out as the output of the ExpandIterable (the output of the Reshuffle).

您应该能够看到作为ExpandIterable输出的元素总数(重新洗牌的输出)。