在Beam管道中编程生成BigQuery模式。

时间:2021-03-08 19:20:23

I have a collection of homogeneous dicts, how do I write them to BigQuery without knowing the schema?

我有一个齐次项的集合,如何在不知道模式的情况下将它们写到BigQuery ?

The BigQuerySink requires that I specify the schema when I construct it. But, I don't know the schema: it's defined by the keys of the dicts I'm trying to write.

BigQuerySink要求在构造模式时指定模式。但是,我不知道这个模式:它是由我想要写的指令的键来定义的。

Is there a way to have my pipeline infer the schema, and then provide it back (as a sideinput?) to the sink?

是否有一种方法可以让我的管道推断这个模式,然后将它(作为一个侧输入)提供给sink?

For example:

例如:

# Create a PCollection of dicts, something like
# {'field1': 'myval', 'field2': 10}
data = (p | 'generate_data' >> beam.ParDo(CreateData())

# Infer the schema from the data
# Generates a string for each element (ok to assume all dict keys equal)
# "field1:STRING, field2:INTEGER"
schema = (data
  | 'infer_schema' >> beam.ParDo(InferSchema())
  | 'sample_one'   >> beam.combiners.Sample.FixedSizeGlobally(1))

But then, how do I feed the schema as a parameter to the BigQuerySink, and use that in a beam.io.Write?

但是,如何将模式作为参数添加到BigQuerySink,并在beam.io.Write中使用它?

I know this isn't correct, but what I want to do is:

我知道这是不对的,但我想做的是:

sink = BigQuerySink(tablename, dataset, project, schema=Materialize(schema))
p | 'write_bigquery' >> beam.io.Write(sink)

tl;dr Is there a way to create and write a bigquery table from apache beam programmatically inferring the schema from the data?

有一种方法可以从apache beam中创建和编写一个bigquery表,以编程方式从数据推断模式吗?

2 个解决方案

#1


0  

Assuming that your schema can change frequently, it may work better for you to keep the data in a more generic form.

假设您的模式经常发生变化,那么您可以更好地将数据保存在更通用的形式中。

For example, your row may consist of a single repeated record (your dictionary entries).

例如,您的行可能包含一个重复记录(您的字典条目)。

The record schema looks like: key (STRING) | optional string_val (STRING) | optional int_val (INTEGER) optional double_val (DOUBLE) | optional boolean_val (BOOLEAN) | ...

记录模式看起来是:key (STRING) |可选string_val (STRING) |可选int_val(整数)可选的double_val (DOUBLE) |可选boolean_val (BOOLEAN) |…

Then you can write queries that scan your records by type. This is somewhat less efficient (because you'll have to scan rows that you might otherwise be able to skip if they were in different columns), but entirely avoids specifying your schema up-front.

然后,您可以编写以类型扫描您的记录的查询。这有点低效(因为您将不得不扫描那些在不同列中可以跳过的行),但是完全避免在前面指定您的模式。

#2


0  

For now the best solution I've come up with is explicitly hardcoding a mapping of dict keys to BigQuery schema. Two benefits -- it works around the must-specify-schema issue and it lets me filter elements out of the dict I don't want in the BigQuery.

目前为止,我提出的最好的解决方案是明确地硬编码一个命令键到BigQuery模式的映射。两个好处——它围绕着必须专门化的模式问题,它让我过滤掉在BigQuery中不需要的命令中的元素。

SCHEMA = {
  'field1': 'INTEGER',
  'field2': 'STRING',
  ...
}
schema_str = ','.join(['%s:%s' % (k, v) for k,v in SCHEMA.iteritems()])

sink = BigQuerySink(tablename,
        dataset=dataset,
        project=project,
        schema=schema_str,
        write_disposition=BigQueryDisposition.WRITE_TRUNCATE)

(pipeline
  # filters just the keys of each dict to the keys of SCHEMA.
  | 'filter_fields' >> beam.ParDo(FilterFieldKeysDoFn(SCHEMA))
  | 'to_bigquery' >> beam.io.Write(sink))

#1


0  

Assuming that your schema can change frequently, it may work better for you to keep the data in a more generic form.

假设您的模式经常发生变化,那么您可以更好地将数据保存在更通用的形式中。

For example, your row may consist of a single repeated record (your dictionary entries).

例如,您的行可能包含一个重复记录(您的字典条目)。

The record schema looks like: key (STRING) | optional string_val (STRING) | optional int_val (INTEGER) optional double_val (DOUBLE) | optional boolean_val (BOOLEAN) | ...

记录模式看起来是:key (STRING) |可选string_val (STRING) |可选int_val(整数)可选的double_val (DOUBLE) |可选boolean_val (BOOLEAN) |…

Then you can write queries that scan your records by type. This is somewhat less efficient (because you'll have to scan rows that you might otherwise be able to skip if they were in different columns), but entirely avoids specifying your schema up-front.

然后,您可以编写以类型扫描您的记录的查询。这有点低效(因为您将不得不扫描那些在不同列中可以跳过的行),但是完全避免在前面指定您的模式。

#2


0  

For now the best solution I've come up with is explicitly hardcoding a mapping of dict keys to BigQuery schema. Two benefits -- it works around the must-specify-schema issue and it lets me filter elements out of the dict I don't want in the BigQuery.

目前为止,我提出的最好的解决方案是明确地硬编码一个命令键到BigQuery模式的映射。两个好处——它围绕着必须专门化的模式问题,它让我过滤掉在BigQuery中不需要的命令中的元素。

SCHEMA = {
  'field1': 'INTEGER',
  'field2': 'STRING',
  ...
}
schema_str = ','.join(['%s:%s' % (k, v) for k,v in SCHEMA.iteritems()])

sink = BigQuerySink(tablename,
        dataset=dataset,
        project=project,
        schema=schema_str,
        write_disposition=BigQueryDisposition.WRITE_TRUNCATE)

(pipeline
  # filters just the keys of each dict to the keys of SCHEMA.
  | 'filter_fields' >> beam.ParDo(FilterFieldKeysDoFn(SCHEMA))
  | 'to_bigquery' >> beam.io.Write(sink))