Apache Beam窗口化和分片BigQuery输出表

时间:2021-12-01 14:18:39

My use case is simple: read event logs from Pub/Sub subscription, parse them and save into BigQuery. Because the number of events is expected to grow significantly and I work with unbounded data source I decided to configure sharding in BigQuery: store events into daily tables based on timestamp from the event data (what is called "event time" in the Beam documentation). The question I have is do I need to configure windowing in my case or I can just leave the default configuration which implicitly uses global window? The reason I'm asking is because most of the examples of BigQuery sharding I found assume usage of windowing configuration. But in my case, since I'm not using any grouping operations as GroupByKey and Combine, looks like I should be just fine without any windowing configuration. Or are there any reasons for me to use windowing anyway, maybe it affects how BigQueryIO performs for example?

我的用例很简单:从Pub / Sub订阅中读取事件日志,解析它们并保存到BigQuery中。因为事件的数量预计会显着增加而且我使用*数据源我决定在BigQuery中配置分片:根据事件数据的时间戳将事件存储到日常表中(Beam文档中称为“事件时间”) 。我的问题是我需要在我的情况下配置窗口,或者我可以保留隐式使用全局窗口的默认配置吗?我问的原因是因为我发现大多数BigQuery分片的例子都假设使用了窗口配置。但在我的情况下,由于我没有使用任何分组操作作为GroupByKey和Combine,看起来我应该没有任何窗口配置就好了。或者我有没有理由使用窗口,也许它会影响BigQueryIO的表现如何?

The way I do sharding now is below.

我现在进行分片的方式如下。

static class TableNamingFn implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {
    @Override
    public TableDestination apply(ValueInSingleWindow<TableRow> input) {
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);

        TableReference reference = new TableReference();
        reference.setProjectId("test-project");
        reference.setDatasetId("event_log");

        DateTime timestamp = new DateTime(input.getValue().get("event_timestamp"), DateTimeZone.UTC);
        reference.setTableId("events_" + formatter.print(timestamp));
        return new TableDestination(reference, null);
    }
}

// And then
eventRows.apply("BigQueryWrite", BigQueryIO.writeTableRows()
        .to(new TableNamingFn())
        .withSchema(EventSchema)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

1 个解决方案

#1


1  

It looks like you are trying to shard the table by date, have you considered using a Date-partitioned Table instead. You could update where you set your table id to using the partition decorator, something like:

看起来您正在尝试按日期对表进行分片,您是否考虑过使用日期分区表。您可以使用分区装饰器更新设置表ID的位置,例如:

reference.setTableId("events$" + formatter.print(timestamp));

This article covers using BigQuery's partitioned tables with Apache Beam. In particular this snippet of code is probably what you want to use: https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition

本文介绍如何在Apache Beam中使用BigQuery的分区表。特别是这段代码可能就是你想要使用的代码:https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition

#1


1  

It looks like you are trying to shard the table by date, have you considered using a Date-partitioned Table instead. You could update where you set your table id to using the partition decorator, something like:

看起来您正在尝试按日期对表进行分片,您是否考虑过使用日期分区表。您可以使用分区装饰器更新设置表ID的位置,例如:

reference.setTableId("events$" + formatter.print(timestamp));

This article covers using BigQuery's partitioned tables with Apache Beam. In particular this snippet of code is probably what you want to use: https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition

本文介绍如何在Apache Beam中使用BigQuery的分区表。特别是这段代码可能就是你想要使用的代码:https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition