如何在使用java中的谷歌云数据流从云存储中读取时跳过csv文件中的回车

时间:2021-08-19 15:43:52

I have a CSV file which consists of new carriage returns (\n) in each row. While reading the CSV file from cloud storage using TextIO.read function of Apache beam it is considering \n as new record. how can i overcome this issue.

我有一个CSV文件,其中包含每行中的新回车符(\ n)。使用Apache beam的TextIO.read函数从云存储中读取CSV文件时,它正在考虑\ n作为新记录。我怎么能克服这个问题。

I have tried with by extending filebasedsource but it is reading only first line of the CSV file when we apply pTransorms.

我通过扩展filebasedsource尝试过,但是当我们应用pTransorms时它只读取CSV文件的第一行。

help will be appreciated

帮助将不胜感激

Thanks in Advance

提前致谢

1 个解决方案

#1


1  

TextIO can not do this - it always splits input based on carriage returns and is not aware of CSV-specific quoting of some of these carriage returns.

TextIO不能这样做 - 它总是根据回车分割输入,并且不知道某些回车的CSV特定引用。

However, Beam 2.2 includes a transform that will make it very easy for you to write the CSV-specific (or any other file format specific reading) code yourself: FileIO. Do something like this:

但是,Beam 2.2包含一个转换,使您可以自己轻松编写特定于CSV的代码(或任何其他特定于文件格式的代码):FileIO。做这样的事情:

p.apply(FileIO.match().filepattern("gs://..."))
 .apply(FileIO.readMatches())
 .apply(ParDo.of(new DoFn<ReadableFile, TableRow>() {
   @ProcessElement
   public void process(ProcessContext c) throws IOException {
     try (InputStream is = Channels.newInputStream(c.element().open())) {
       // ... Use your favorite Java CSV library ...
       ... c.output(next csv record) ...
     }
   }
 }))

#1


1  

TextIO can not do this - it always splits input based on carriage returns and is not aware of CSV-specific quoting of some of these carriage returns.

TextIO不能这样做 - 它总是根据回车分割输入,并且不知道某些回车的CSV特定引用。

However, Beam 2.2 includes a transform that will make it very easy for you to write the CSV-specific (or any other file format specific reading) code yourself: FileIO. Do something like this:

但是,Beam 2.2包含一个转换,使您可以自己轻松编写特定于CSV的代码(或任何其他特定于文件格式的代码):FileIO。做这样的事情:

p.apply(FileIO.match().filepattern("gs://..."))
 .apply(FileIO.readMatches())
 .apply(ParDo.of(new DoFn<ReadableFile, TableRow>() {
   @ProcessElement
   public void process(ProcessContext c) throws IOException {
     try (InputStream is = Channels.newInputStream(c.element().open())) {
       // ... Use your favorite Java CSV library ...
       ... c.output(next csv record) ...
     }
   }
 }))