通过本地DataFlow作业写入BigQuery

时间:2021-12-18 14:49:55

I have a DataFlow pipeline set up locally on my machine. It takes a sample new line delimited file full of JSON objects, does its thing, formats the end result in TableRow. When it is time to write to BigQuery, I don't know how to authenticate. I couldn't find anything in Dataflow's documentation or examples where one writes to BigQuery using a local pipeline. If possible I would like to know how to do it. In my mind it should be something like either:

我在我的机器上本地设置了一个DataFlow管道。它需要一个完整的JSON对象的新行分隔文件样本,它的作用,格式化TableRow中的最终结果。什么时候写BigQuery,我不知道如何进行身份验证。我在Dataflow的文档或示例中找不到任何内容,其中一个人使用本地管道写入BigQuery。如果可能的话我想知道怎么做。在我看来它应该是这样的:

...
session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
      .apply(BigQueryIO.Write
      .withCredentials/Token(SOME_TOKEN)  // <- This line
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
      .to("project:db.table"));
...

or

要么

...
PipelineOptions options = PipelineOptionsFactory.create();
options.setGoogleCloudCredentials/Token(SOME_TOKEN)  // <- This line
Pipeline p = Pipeline.create(options);
...

1 个解决方案

#1


2  

Your second approach is the right one. It will looks something like this:

你的第二种方法是正确的方法。它看起来像这样:

GcpOptions gcpOptions = options.as(GcpOptions.class);
gcpOptions.setGcpCredential(...);
gcpOptions.setProject(...);
// etc

The idiom of options.as(SomeSpecificOptions.class) is worth remembering.

options.as(SomeSpecificOptions.class)的成语值得记住。

You'll want to read over GcpOptions to see the methods available.

您需要阅读GcpOptions以查看可用的方法。

#1


2  

Your second approach is the right one. It will looks something like this:

你的第二种方法是正确的方法。它看起来像这样:

GcpOptions gcpOptions = options.as(GcpOptions.class);
gcpOptions.setGcpCredential(...);
gcpOptions.setProject(...);
// etc

The idiom of options.as(SomeSpecificOptions.class) is worth remembering.

options.as(SomeSpecificOptions.class)的成语值得记住。

You'll want to read over GcpOptions to see the methods available.

您需要阅读GcpOptions以查看可用的方法。