GCP Dataflow-从Storage读取CSV文件并写入BigQuery

时间:2021-11-13 13:54:37

I have a CSV file in Storage and I want to read it and write it into BigQuery Table. this is my CSV file where the first line is the header:

我在Storage中有一个CSV文件,我想阅读它并将其写入BigQuery Table。这是我的CSV文件,其中第一行是标题:

GroupName,Groupcode,GroupOwner,GroupCategoryID
System Administrators,sysadmin,13456,100
Independence High Teachers,HS Teachers,,101
John Glenn Middle Teachers,MS Teachers,13458,102
Liberty Elementary Teachers,Elem Teachers,13559,103
1st Grade Teachers,1stgrade,,104
2nd Grade Teachers,2nsgrade,13561,105
3rd Grade Teachers,3rdgrade,13562,106
Guidance Department,guidance,,107
Independence Math Teachers,HS Math,13660,108
Independence English Teachers,HS English,13661,109
John Glenn 8th Grade Teachers,8thgrade,,110
John Glenn 7th Grade Teachers,7thgrade,13452,111
Elementary Parents,Elem Parents,,112
Middle School Parents,MS Parents,18001,113
High School Parents,HS Parents,18002,114

this is my code:

这是我的代码:

    public class StorgeBq {

        public static class StringToRowConverter extends DoFn<String, TableRow> {

            private String[] columnNames;

            private boolean isFirstRow = true;

            @ProcessElement
            public void processElement(ProcessContext c) {
                TableRow row = new TableRow();

                String[] parts = c.element().split(",");

                if (isFirstRow) {
                    columnNames = Arrays.copyOf(parts, parts.length);
                    isFirstRow = false;
                } else {
                    for (int i = 0; i < parts.length; i++) {
                        row.set(columnNames[i], parts[i]);
                    }
                    c.output(row);
                }
            }
        }

        public static void main(String[] args) {

            DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                      .as(DataflowPipelineOptions.class);
                    options.setZone("europe-west1-c");
                    options.setProject("mydata-dev");
                    options.setRunner(DataflowRunner.class);
                    Pipeline p = Pipeline.create(options);

            p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv"))
            .apply("ConverToBqRow",ParDo.of(new StringToRowConverter()))
            .apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows()
                    .to("mydata-dev:DF_TEST.dataflow_table")
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER));
            p.run().waitUntilFinish();
        }

}

There are some problems: 1) when the job starts executing, I see there is a process called "DropInputs" which I have not defined in my code!! and starts running before all tasks, Why??GCP Dataflow-从Storage读取CSV文件并写入BigQuery

有一些问题:1)当作业开始执行时,我看到有一个名为“DropInputs”的进程,我没有在我的代码中定义!!并且在所有任务开始之前运行,为什么?

2) Why the pipline doesn't start with the first Task "ReadLines" ? 3) In the log file, I see that in the task "WriteToBq" it tries to find one of the data as field,for example "1st Grade Teachers" is not a field but a data for "GroupName" :

2)为什么pipline不以第一个任务“ReadLines”开头? 3)在日志文件中,我看到在任务“WriteToBq”中它试图找到一个数据作为字段,例如“一年级教师”不是一个字段而是“GroupName”的数据:

"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.",

1 个解决方案

#1


1  

You've a couple of problems in your code. But, first of all, regarding the "DropInputs" stage - you can safely ignore it. It was the result of this bug report. I still don't understand why it needs to be displayed (it's confusing a lot of our users too), and I'd love for a Googler to chime in on that. In my opinion it's just clutter.

你的代码中有几个问题。但是,首先,关于“DropInputs”阶段 - 您可以放心地忽略它。这是这个错误报告的结果。我仍然不明白为什么需要显示它(这让我们很多用户感到困惑),而且我很乐意让Google员工参与其中。在我看来,它只是杂乱无章。

Right, to your code now:

是的,现在你的代码:

  1. You are assuming that the first row read will be your header. This is an incorrect assumption. Dataflow reads in parallel, so the header row may arrive at any time. Instead of using a boolean flag to check, check the string value itself each time in your ParDo e.g. if (c.element.contains("GroupName") then..
  2. 您假设读取的第一行将是您的标题。这是一个不正确的假设。数据流并行读取,因此标题行可以随时到达。不是使用布尔标志来检查,而是每次在ParDo中检查字符串值本身,例如if(c.element.contains(“GroupName”)然后..
  3. You are missing the BigQuery table schema. You need to add withSchema(..) to your BigQuery sink. Here's an example from one of my public pipelines.
  4. 您缺少BigQuery表架构。您需要将toSchema(..)添加到BigQuery接收器。以下是我的一条公共管道的示例。

#1


1  

You've a couple of problems in your code. But, first of all, regarding the "DropInputs" stage - you can safely ignore it. It was the result of this bug report. I still don't understand why it needs to be displayed (it's confusing a lot of our users too), and I'd love for a Googler to chime in on that. In my opinion it's just clutter.

你的代码中有几个问题。但是,首先,关于“DropInputs”阶段 - 您可以放心地忽略它。这是这个错误报告的结果。我仍然不明白为什么需要显示它(这让我们很多用户感到困惑),而且我很乐意让Google员工参与其中。在我看来,它只是杂乱无章。

Right, to your code now:

是的,现在你的代码:

  1. You are assuming that the first row read will be your header. This is an incorrect assumption. Dataflow reads in parallel, so the header row may arrive at any time. Instead of using a boolean flag to check, check the string value itself each time in your ParDo e.g. if (c.element.contains("GroupName") then..
  2. 您假设读取的第一行将是您的标题。这是一个不正确的假设。数据流并行读取,因此标题行可以随时到达。不是使用布尔标志来检查,而是每次在ParDo中检查字符串值本身,例如if(c.element.contains(“GroupName”)然后..
  3. You are missing the BigQuery table schema. You need to add withSchema(..) to your BigQuery sink. Here's an example from one of my public pipelines.
  4. 您缺少BigQuery表架构。您需要将toSchema(..)添加到BigQuery接收器。以下是我的一条公共管道的示例。