BigQueryIO返回TypedRead 而不是PCollection 。如何获得真实数据?

时间:2021-12-22 14:29:01

I have a problem with retrieving a data from bigquery table inside DoFn. I can't find example to extract values from TypedRead.


This is a simplified pipeline. I would like to check does record with target SSN exists or not in bigquery table. The target SSN will be received via pubsub in real pipeline, I have replaced it with array of strings.


final BigQueryIoTestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryIoTestOptions.class);

final List<String> SSNs = Arrays.asList("775-89-3939");

Pipeline p = Pipeline.create(options);

PCollection<String> ssnCollection = p.apply("GetSSNParams", Create.of(SSNs)).setCoder(StringUtf8Coder.of());

ssnCollection.apply("SelectFromBQ", ParDo.of(new DoFn<String, TypedRead<TableRow>>() {
  public void processElement(ProcessContext c) throws Exception {
    TypedRead<TableRow> tr =
    .fromQuery("SELECT pid19PatientSSN FROM dataset.table where pid19PatientSSN = '" + c.element() + "' LIMIT 1");

.apply("ParseResponseFromBigQuery", ParDo.of(new DoFn<TypedRead<TableRow>, Void>() {
  public void processElement(ProcessContext c) throws Exception {

1 个解决方案



Big query returns PCollection only, we can get the result as entry set like the below example or we can serialize to objects as well like mentioned here


If you want to query from BigQuery middle of your pipeline use BigQuery instead of BigQueryIO like mentioned here


BigQueryIO Example:


PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> result = pipeline.apply(BigQueryIO.readTableRows()
                .fromQuery("SELECT id, name FROM [project-test:test_data.test] LIMIT 1"));
result.apply(MapElements.via(new SimpleFunction<TableRow, Void>() {
            public Void apply(TableRow obj) {
                System.out.println("***" + obj);
                        (k)-> {
                            System.out.println(k.getKey() + " :" + k.getValue());
                return null;

BigQuery Example:


// [START bigquery_simple_app_client]
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// [END bigquery_simple_app_client]
// [START bigquery_simple_app_query]
QueryJobConfiguration queryConfig =
      "SELECT "
          + "CONCAT('https://*.com/questions/', CAST(id as STRING)) as url, "
          + "view_count "
          + "FROM `bigquery-public-data.*.posts_questions` "
          + "WHERE tags like '%google-bigquery%' "
          + "ORDER BY favorite_count DESC LIMIT 10")
        // Use standard SQL syntax for queries.
        // See:

// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

// Wait for the query to complete.
queryJob = queryJob.waitFor();

// Check for errors
if (queryJob == null) {
  throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
  // You can also look at queryJob.getStatus().getExecutionErrors() for all
  // errors, not just the latest one.
  throw new RuntimeException(queryJob.getStatus().getError().toString());
// [END bigquery_simple_app_query]

// [START bigquery_simple_app_print]
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId);

TableResult result = queryJob.getQueryResults();

// Print all pages of the results.
for (FieldValueList row : result.iterateAll()) {
  String url = row.get("url").getStringValue();
  long viewCount = row.get("view_count").getLongValue();
  System.out.printf("url: %s views: %d%n", url, viewCount);
// [END bigquery_simple_app_print]



