通过Apache Beam写入动态BigQuery表

时间:2021-12-01 14:18:51

I am getting the BigQuery table name at runtime and I pass that name to the BigQueryIO.write operation at the end of my pipeline to write to that table.

我在运行时获取BigQuery表名称,并将该名称传递给管道末尾的BigQueryIO.write操作以写入该表。

The code that I've written for it is:

我为它编写的代码是:

rows.apply("write to BigQuery", BigQueryIO
    .writeTableRows()
    .withSchema(schema)
    .to("projectID:DatasetID."+tablename)
    .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));

With this syntax I always get an error,

使用这种语法我总是得到一个错误,

Exception in thread "main" java.lang.IllegalArgumentException: Table reference is not in [project_id]:[dataset_id].[table_id] format

How to pass the table name with the correct format when I don't know before hand which table it should put the data in? Any suggestions?

如果我不知道之前哪个表应该放入数据,如何传递具有正确格式的表名?有什么建议么?

Thank You

谢谢

1 个解决方案

#1


2  

Very late to the party on this however. I suspect the issue is you were passing in a string not a table reference.

然而,这方面很晚才到了。我怀疑问题是你传入的字符串不是表引用。

If you created a table reference I suspect you'd have no issues with the above code.

如果您创建了表引用,我怀疑您对上述代码没有任何问题。

com.google.api.services.bigquery.model.TableReference table = new TableReference()
            .setProjectId(projectID)
            .setDatasetId(DatasetID)
            .setTableId(tablename);

rows.apply("write to BigQuery", BigQueryIO
    .writeTableRows()
    .withSchema(schema)
    .to(table)
    .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));

#1


2  

Very late to the party on this however. I suspect the issue is you were passing in a string not a table reference.

然而,这方面很晚才到了。我怀疑问题是你传入的字符串不是表引用。

If you created a table reference I suspect you'd have no issues with the above code.

如果您创建了表引用,我怀疑您对上述代码没有任何问题。

com.google.api.services.bigquery.model.TableReference table = new TableReference()
            .setProjectId(projectID)
            .setDatasetId(DatasetID)
            .setTableId(tablename);

rows.apply("write to BigQuery", BigQueryIO
    .writeTableRows()
    .withSchema(schema)
    .to(table)
    .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));