访问BigQuery Apache Beam中的TableRow列

时间:2021-10-03 17:21:40

I am trying to

我在尝试着

1.Read JSON events from Cloud Pub/Sub

1.从Cloud Pub / Sub中读取JSON事件

2.Load the events from Cloud Pub/Sub to BigQuery every 15 minutes using file loads to save cost on streaming inserts.

2.使用文件加载每15分钟将事件从Cloud Pub / Sub加载到BigQuery,以节省流式插入的成本。

3.The destination will differ based on "user_id" and "campaign_id" field in the JSON event, "user_id" will be dataset name and "campaign_id" will be the table name. The partition name comes from the event timestamp.

3.目的地将根据JSON事件中的“user_id”和“campaign_id”字段而有所不同,“user_id”将是数据集名称,“campaign_id”将是表名。分区名称来自事件时间戳。

4.The schema for all tables stays same.

4.所有表的模式保持不变。

I am new to Java and Beam. I think my code mostly does what I am trying to do and I just a need little help here.

我是Java和Beam的新手。我认为我的代码主要是做我想要做的事情,而我在这里只需要一些帮助。

But I unable to access "campaign_id" and "user_id" field in the JSON message. So, my events are not routing to the correct table.

但是我无法访问JSON消息中的“campaign_id”和“user_id”字段。所以,我的事件没有路由到正确的表。

package ...;

import com.google.api.services.bigquery.model.TableSchema;
import javafx.scene.control.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND;

public class ClickLogConsumer {
    private static final int BATCH_INTERVAL_SECS = 15 * 60;
    private static final String PROJECT = "pure-app";

    public static PTransform<PCollection<String>, PCollection<com.google.api.services.bigquery.model.TableRow>> jsonToTableRow() {
        return new JsonToTableRow();
    }

    private static class JsonToTableRow
            extends PTransform<PCollection<String>, PCollection<com.google.api.services.bigquery.model.TableRow>> {

        @Override
        public PCollection<com.google.api.services.bigquery.model.TableRow> expand(PCollection<String> stringPCollection) {
            return stringPCollection.apply("JsonToTableRow", MapElements.<String, com.google.api.services.bigquery.model.TableRow>via(
                    new SimpleFunction<String, com.google.api.services.bigquery.model.TableRow>() {
                        @Override
                        public com.google.api.services.bigquery.model.TableRow apply(String json) {
                            try {

                                InputStream inputStream = new ByteArrayInputStream(
                                        json.getBytes(StandardCharsets.UTF_8.name()));

                                //OUTER is used here to prevent EOF exception
                                return TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER);
                            } catch (IOException e) {
                                throw new RuntimeException("Unable to parse input", e);
                            }
                        }
                    }));
        }
    }


    public static void main(String[] args) throws Exception {
        Pipeline pipeline = Pipeline.create(options);
        pipeline
                .apply(PubsubIO.readStrings().withTimestampAttribute("timestamp").fromTopic("projects/pureapp-199410/topics/clicks"))
                .apply(jsonToTableRow())
                .apply("WriteToBQ",
                        BigQueryIO.writeTableRows()
                                .withMethod(FILE_LOADS)
                                .withWriteDisposition(WRITE_APPEND)
                                .withCreateDisposition(CREATE_IF_NEEDED)
                                .withTriggeringFrequency(Duration.standardSeconds(BATCH_INTERVAL_SECS))
                                .withoutValidation()
                                .to(new DynamicDestinations<TableRow, String>() {
                                    @Override
                                    public String getDestination(ValueInSingleWindow<TableRow> element) {
                                        String tableName = "campaign_id"; // JSON message in Pub/Sub has "campaign_id" field, how do I access it here?
                                        String datasetName = "user_id"; // JSON message in Pub/Sub has "user_id" field, how do I access it here?
                                        Instant eventTimestamp = element.getTimestamp();
                                        String partition = new SimpleDateFormat("yyyyMMdd").format(eventTimestamp);
                                        return String.format("%s:%s.%s$%s", PROJECT, datasetName, tableName, partition);
                                    }

                                    @Override
                                    public TableDestination getTable(String table) {
                                        return new TableDestination(table, null);
                                    }

                                    @Override
                                    public TableSchema getSchema(String destination) {
                                        return getTableSchema();
                                    }
                                }));
        pipeline.run();
    }
}

I arrived at the above code based on reading:

我根据以下内容得出了上述代码:

1.https://medium.com/myheritage-engineering/kafka-to-bigquery-load-a-guide-for-streaming-billions-of-daily-events-cbbf31f4b737

1.https://medium.com/myheritage-engineering/kafka-to-bigquery-load-a-guide-for-streaming-billions-of-daily-events-cbbf31f4b737

2.https://shinesolutions.com/2017/12/05/fun-with-serializable-functions-and-dynamic-destinations-in-cloud-dataflow/

2.https://shinesolutions.com/2017/12/05/fun-with-serializable-functions-and-dynamic-destinations-in-cloud-dataflow/

3.https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.html

3.https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.html

4.BigQueryIO - Write performance with streaming and FILE_LOADS

4.BigQueryIO - 使用流和FILE_LOADS写性能

5.Inserting into BigQuery via load jobs (not streaming)

5.通过加载作业插入BigQuery(不是流式)

Update

更新

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND;

public class ClickLogConsumer {
    private static final int BATCH_INTERVAL_SECS = 15 * 60;
    private static final String PROJECT = "pure-app";

    public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow() {
        return new JsonToTableRow();
    }

    private static class JsonToTableRow
            extends PTransform<PCollection<String>, PCollection<TableRow>> {

        @Override
        public PCollection<TableRow> expand(PCollection<String> stringPCollection) {
            return stringPCollection.apply("JsonToTableRow", MapElements.<String, com.google.api.services.bigquery.model.TableRow>via(
                    new SimpleFunction<String, TableRow>() {
                        @Override
                        public TableRow apply(String json) {
                            try {

                                InputStream inputStream = new ByteArrayInputStream(
                                        json.getBytes(StandardCharsets.UTF_8.name()));

                                //OUTER is used here to prevent EOF exception
                                return TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER);
                            } catch (IOException e) {
                                throw new RuntimeException("Unable to parse input", e);
                            }
                        }
                    }));
        }
    }


    public static void main(String[] args) throws Exception {
        Pipeline pipeline = Pipeline.create(options);
        pipeline
                .apply(PubsubIO.readStrings().withTimestampAttribute("timestamp").fromTopic("projects/pureapp-199410/topics/clicks"))
                .apply(jsonToTableRow())
                .apply(BigQueryIO.write()
                        .withTriggeringFrequency(Duration.standardSeconds(BATCH_INTERVAL_SECS))
                        .withMethod(FILE_LOADS)
                        .withWriteDisposition(WRITE_APPEND)
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withSchema(new TableSchema().setFields(
                                ImmutableList.of(
                                        new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
                                        new TableFieldSchema().setName("exchange").setType("STRING"))))
                        .to((row) -> {
                            String datasetName = row.getValue().get("user_id").toString();
                            String tableName = row.getValue().get("campaign_id").toString();
                            return new TableDestination(String.format("%s:%s.%s", PROJECT, datasetName, tableName), "Some destination");
                        })
                        .withTimePartitioning(new TimePartitioning().setField("timestamp")));
        pipeline.run();
    }
}

1 个解决方案

#1


1  

How about: String tableName = element.getValue().get("campaign_id").toString() and likewise for the dataset.

怎么样:String tableName = element.getValue()。get(“campaign_id”)。toString(),同样适用于数据集。

Besides, for inserting into time-partitioned tables, I strongly recommend using BigQuery's Column-Based Partitioning, instead of using a partition decorator in the table name. Please see "Loading historical data into time-partitioned BigQuery tables" in the javadoc - you'll need a timestamp column. (note that the javadoc has a typo: "time" vs "timestamp")

此外,为了插入时间分区表,我强烈建议使用BigQuery的基于列的分区,而不是在表名中使用分区装饰器。请参阅javadoc中的“将历史数据加载到时间分区的BigQuery表中” - 您需要一个时间戳列。 (请注意,javadoc有一个拼写错误:“time”vs“timestamp”)

#1


1  

How about: String tableName = element.getValue().get("campaign_id").toString() and likewise for the dataset.

怎么样:String tableName = element.getValue()。get(“campaign_id”)。toString(),同样适用于数据集。

Besides, for inserting into time-partitioned tables, I strongly recommend using BigQuery's Column-Based Partitioning, instead of using a partition decorator in the table name. Please see "Loading historical data into time-partitioned BigQuery tables" in the javadoc - you'll need a timestamp column. (note that the javadoc has a typo: "time" vs "timestamp")

此外,为了插入时间分区表,我强烈建议使用BigQuery的基于列的分区,而不是在表名中使用分区装饰器。请参阅javadoc中的“将历史数据加载到时间分区的BigQuery表中” - 您需要一个时间戳列。 (请注意,javadoc有一个拼写错误:“time”vs“timestamp”)