BigQueryIO返回TypedRead 而不是PCollection 。如何获得真实数据?

时间:2021-12-22 14:29:01

I have a problem with retrieving a data from bigquery table inside DoFn. I can't find example to extract values from TypedRead.

我在从DoFn中的bigquery表中检索数据时遇到问题。我找不到从TypedRead中提取值的示例。

This is a simplified pipeline. I would like to check does record with target SSN exists or not in bigquery table. The target SSN will be received via pubsub in real pipeline, I have replaced it with array of strings.

这是一个简化的管道。我想在bigquery表中查看目标SSN是否存在的记录。目标SSN将通过实际管道中的pubsub接收,我已经用字符串数组替换它。

final BigQueryIoTestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryIoTestOptions.class);

final List<String> SSNs = Arrays.asList("775-89-3939");

Pipeline p = Pipeline.create(options);

PCollection<String> ssnCollection = p.apply("GetSSNParams", Create.of(SSNs)).setCoder(StringUtf8Coder.of());

ssnCollection.apply("SelectFromBQ", ParDo.of(new DoFn<String, TypedRead<TableRow>>() {
  @ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    TypedRead<TableRow> tr =
    BigQueryIO.readTableRows()
    .fromQuery("SELECT pid19PatientSSN FROM dataset.table where pid19PatientSSN = '" + c.element() + "' LIMIT 1");

    c.output(tr);
  }
  }))
.apply("ParseResponseFromBigQuery", ParDo.of(new DoFn<TypedRead<TableRow>, Void>() {
  @ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    System.out.println(c.element().toString());
  }
}));

p.run();

1 个解决方案

#1


0  

Big query returns PCollection only, we can get the result as entry set like the below example or we can serialize to objects as well like mentioned here

大查询只返回PCollection,我们可以像下面的例子那样得到结果作为条目集,或者我们可以像这里提到的那样序列化到对象

If you want to query from BigQuery middle of your pipeline use BigQuery instead of BigQueryIO like mentioned here

如果你想从管道中的BigQuery查询,请使用BigQuery而不是像这里提到的BigQueryIO

BigQueryIO Example:

BigQueryIO示例:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> result = pipeline.apply(BigQueryIO.readTableRows()
                .fromQuery("SELECT id, name FROM [project-test:test_data.test] LIMIT 1"));
result.apply(MapElements.via(new SimpleFunction<TableRow, Void>() {
            @Override
            public Void apply(TableRow obj) {
                System.out.println("***" + obj);
                obj.entrySet().forEach(
                        (k)-> {
                            System.out.println(k.getKey() + " :" + k.getValue());
                        }
                );
                return null;
            }
        }));
        pipeline.run().waitUntilFinish();

BigQuery Example:

BigQuery示例:

// [START bigquery_simple_app_client]
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// [END bigquery_simple_app_client]
// [START bigquery_simple_app_query]
QueryJobConfiguration queryConfig =
    QueryJobConfiguration.newBuilder(
      "SELECT "
          + "CONCAT('https://*.com/questions/', CAST(id as STRING)) as url, "
          + "view_count "
          + "FROM `bigquery-public-data.*.posts_questions` "
          + "WHERE tags like '%google-bigquery%' "
          + "ORDER BY favorite_count DESC LIMIT 10")
        // Use standard SQL syntax for queries.
        // See: https://cloud.google.com/bigquery/sql-reference/
        .setUseLegacySql(false)
        .build();

// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

// Wait for the query to complete.
queryJob = queryJob.waitFor();

// Check for errors
if (queryJob == null) {
  throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
  // You can also look at queryJob.getStatus().getExecutionErrors() for all
  // errors, not just the latest one.
  throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// [END bigquery_simple_app_query]

// [START bigquery_simple_app_print]
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId);

TableResult result = queryJob.getQueryResults();

// Print all pages of the results.
for (FieldValueList row : result.iterateAll()) {
  String url = row.get("url").getStringValue();
  long viewCount = row.get("view_count").getLongValue();
  System.out.printf("url: %s views: %d%n", url, viewCount);
}
// [END bigquery_simple_app_print]

#1


0  

Big query returns PCollection only, we can get the result as entry set like the below example or we can serialize to objects as well like mentioned here

大查询只返回PCollection,我们可以像下面的例子那样得到结果作为条目集,或者我们可以像这里提到的那样序列化到对象

If you want to query from BigQuery middle of your pipeline use BigQuery instead of BigQueryIO like mentioned here

如果你想从管道中的BigQuery查询,请使用BigQuery而不是像这里提到的BigQueryIO

BigQueryIO Example:

BigQueryIO示例:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> result = pipeline.apply(BigQueryIO.readTableRows()
                .fromQuery("SELECT id, name FROM [project-test:test_data.test] LIMIT 1"));
result.apply(MapElements.via(new SimpleFunction<TableRow, Void>() {
            @Override
            public Void apply(TableRow obj) {
                System.out.println("***" + obj);
                obj.entrySet().forEach(
                        (k)-> {
                            System.out.println(k.getKey() + " :" + k.getValue());
                        }
                );
                return null;
            }
        }));
        pipeline.run().waitUntilFinish();

BigQuery Example:

BigQuery示例:

// [START bigquery_simple_app_client]
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// [END bigquery_simple_app_client]
// [START bigquery_simple_app_query]
QueryJobConfiguration queryConfig =
    QueryJobConfiguration.newBuilder(
      "SELECT "
          + "CONCAT('https://*.com/questions/', CAST(id as STRING)) as url, "
          + "view_count "
          + "FROM `bigquery-public-data.*.posts_questions` "
          + "WHERE tags like '%google-bigquery%' "
          + "ORDER BY favorite_count DESC LIMIT 10")
        // Use standard SQL syntax for queries.
        // See: https://cloud.google.com/bigquery/sql-reference/
        .setUseLegacySql(false)
        .build();

// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

// Wait for the query to complete.
queryJob = queryJob.waitFor();

// Check for errors
if (queryJob == null) {
  throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
  // You can also look at queryJob.getStatus().getExecutionErrors() for all
  // errors, not just the latest one.
  throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// [END bigquery_simple_app_query]

// [START bigquery_simple_app_print]
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId);

TableResult result = queryJob.getQueryResults();

// Print all pages of the results.
for (FieldValueList row : result.iterateAll()) {
  String url = row.get("url").getStringValue();
  long viewCount = row.get("view_count").getLongValue();
  System.out.printf("url: %s views: %d%n", url, viewCount);
}
// [END bigquery_simple_app_print]