无法从数据流中的GCS读取配置文本文件(列名称)

时间:2023-01-25 15:34:12

I have one source CSV file (without header) as well as header config CSV file (contains only column names) in GCS. I also have static table in Bigquery. I want to load source file into static table by using column header mapping (config file).

我在GCS中有一个源CSV文件(没有标题)以及标题配置CSV文件(仅包含列名)。我在Bigquery中也有静态表。我想通过使用列标题映射(配置文件)将源文件加载到静态表中。

I was tried with different approach earlier(I was maintain source file which contain header and data in same file and then tried to split header from source file then insert those data into Bigquery by using header column mapping. I noticed this approach is NOT possible because dataflow shuffle data into multiple worker node. so i dropped this approach.

我之前尝试过不同的方法(我维护的源文件包含同一文件中的标题和数据,然后尝试从源文件中拆分标题,然后使用标题列映射将这些数据插入Bigquery。我注意到这种方法是不可能的,因为dataflow将数据混洗到多个工作节点。所以我放弃了这种方法。

The below code i have used hard coded column names. I am looking approach to read column names from external config file (I want to make my code as dynamic).

下面的代码我使用了硬编码的列名。我正在寻找从外部配置文件中读取列名的方法(我希望将我的代码设置为动态)。

package com.coe.cog;

import java.io.BufferedReader;
import java.util.*;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;

public class SampleTest {
            private static final Logger LOG = LoggerFactory.getLogger(SampleTest.class);

            public static TableReference getGCDSTableReference() {
                        TableReference ref = new TableReference();
                        ref.setProjectId("myownproject");
                        ref.setDatasetId("DS_Employee");
                        ref.setTableId("tLoad14");
                        return ref;
            }

            static class TransformToTable extends DoFn<String, TableRow> {
                        @ProcessElement
                        public void processElement(ProcessContext c) {

                            String csvSplitBy = ",";

                            String lineHeader = "ID,NAME,AGE,SEX"; // Hard code column name but i want to read these header from GCS file.

                            String[] colmnsHeader = lineHeader.split(csvSplitBy); //Only Header array

                            String[] split = c.element().split(csvSplitBy); //Data section

                                TableRow row = new TableRow();


                                for (int i = 0; i < split.length; i++) {                                 

                                  row.set(colmnsHeader[i], split[i]);

                                }

                              c.output(row);
                           // }
                            }

            }

            public interface MyOptions extends PipelineOptions {

                        /*
                        * Param
                        *
                        */

            }

            public static void main(String[] args) {

                        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

                        options.setTempLocation("gs://demo-bucket-data/temp");

                        Pipeline p = Pipeline.create(options);

                        PCollection<String> lines = p.apply("Read From Storage", TextIO.read().from("gs://demo-bucket-data/Demo/Test/SourceFile_WithOutHeader.csv"));

                        PCollection<TableRow> rows = lines.apply("Transform To Table",ParDo.of(new TransformToTable()));

                        rows.apply("Write To Table",BigQueryIO.writeTableRows().to(getGCDSTableReference())                                              
                                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

                        p.run();
            }
}

Source File:

源文件:

1,John,25,M 
2,Smith,30,M
3,Josephine,20,F

Config File (Headers only):

配置文件(仅限标题):

ID,NAME,AGE,SEX

1 个解决方案

#1


3  

You have a couple of options:

你有几个选择:

  1. Use a Dataflow/Beam side input to read the config/header file into some sort of collection e.g. a a ArrayList. It will be available to all workers in the cluster. You can then use the side input to dynamically assign the schema to the BigQuery table using DynamicDestinations.
  2. 使用数据流/波束侧输入将配置/头文件读入某种集合,例如一个ArrayList。它将可供群集中的所有工作人员使用。然后,您可以使用侧输入使用DynamicDestinations将架构动态分配给BigQuery表。
  3. Before dropping into your Dataflow pipeline, call the GCS api directly to grab your config/header file, parse it and then it the results to setup your pipeline.
  4. 在放入Dataflow管道之前,直接调用GCS api来获取配置/头文件,解析它然后结果来设置管道。

#1


3  

You have a couple of options:

你有几个选择:

  1. Use a Dataflow/Beam side input to read the config/header file into some sort of collection e.g. a a ArrayList. It will be available to all workers in the cluster. You can then use the side input to dynamically assign the schema to the BigQuery table using DynamicDestinations.
  2. 使用数据流/波束侧输入将配置/头文件读入某种集合,例如一个ArrayList。它将可供群集中的所有工作人员使用。然后,您可以使用侧输入使用DynamicDestinations将架构动态分配给BigQuery表。
  3. Before dropping into your Dataflow pipeline, call the GCS api directly to grab your config/header file, parse it and then it the results to setup your pipeline.
  4. 在放入Dataflow管道之前,直接调用GCS api来获取配置/头文件,解析它然后结果来设置管道。