云数据流:从云存储中读取整个json数组文件,并创建一个json对象的PCollection

时间:2022-04-14 15:12:54

I have a json array file with content as below

我有一个json数组文件,内容如下

[ {  
  "MemberId" : "1234",  
  "Date" : "2017-07-03",  
  "Interactions" : [ {  
    "Number" : "1327",  
    "DwellTime" : "00:03:05"    
  } ]  
}, {  
  "MemberId" : "5678",  
  "Date" : "2017-07-03",  
  "Interactions" : [ {  
    "Number" : "1172",  
    "DwellTime" : "00:01:26"  
  } ]  
} ] 

I wanted to create a PCollection of Java Object mapped to each Json present in Json array

我想创建一个映射到Json数组中存在的每个Json的Java对象的PCollection

1 个解决方案

#1


2  

JSON formatted like this (records spread over multiple lines instead of one per line) is hard for a data processing tool like beam/dataflow to process in parallel - from a random point in the file, you cannot be sure where the next record begins. You can do it by reading from the beginning of the file, but then you're not really reading in parallel.

像这样格式化的JSON(记录分布在多行而不是每行一行)很难使数据处理工具(如波束/数据流并行处理) - 从文件中的随机点开始,您无法确定下一条记录的开始位置。你可以通过从文件的开头读取来做到这一点,但是你并没有真正并行阅读。

If it's possible, reformatting it so that it's one record per line would let you use something like TextIO to read in the file.

如果可能的话,重新格式化它以使每行一条记录可以让你使用像TextIO这样的东西来读取文件。

If not, you'll need to read the file in one go.

如果没有,您需要一次性读取文件。

I would suggest a couple possible approaches:

我会建议几种可能的方法:

Write a ParDo that reads from the file using the gcs API

This is pretty straight forward. You'll do all the reading in one ParDo and you'll need to implement the connection code inside of that pardo. Inside the pardo you would write the same code you would as if you're reading the file in a normal java program. The pardo will emit each java object as a record.

这很简单。你将在一个ParDo中完成所有阅读,你需要在那个pardo中实现连接代码。在pardo中你会编写相同的代码,就像你在普通的java程序中读取文件一样。 pardo将每个java对象作为记录发出。

Implement a filebasedsource

File based sources will work - when the fileOrPatternSpec is "gs://..." it knows how to read from GCS. You'll need to make sure to set fileMetadata.isReadSeekEfficient to false so that it won't try to split the file. I haven't tried it, but I believe the correct way to do that is to set it inside of the single file constructor of FBS (ie, your class's override of FileBaseSource(MetaData, long, long)

基于文件的源将起作用 - 当fileOrPatternSpec是“gs:// ...”时,它知道如何从GCS读取。您需要确保将fileMetadata.isReadSeekEfficient设置为false,以便它不会尝试拆分文件。我没有尝试过,但我相信正确的方法是将其设置在FBS的单个文件构造函数中(即,您的类重写FileBaseSource(MetaData,long,long)

TextSource/XmlSource (and their accompanying wrappers TextIO/XmlIO) are examples of this, except that they try to implement splitting - yours will be much simpler since it won't.

TextSource / XmlSource(以及它们随附的包装器TextIO / XmlIO)就是这样的例子,除了它们试图实现拆分 - 你的它会更简单,因为它不会。

#1


2  

JSON formatted like this (records spread over multiple lines instead of one per line) is hard for a data processing tool like beam/dataflow to process in parallel - from a random point in the file, you cannot be sure where the next record begins. You can do it by reading from the beginning of the file, but then you're not really reading in parallel.

像这样格式化的JSON(记录分布在多行而不是每行一行)很难使数据处理工具(如波束/数据流并行处理) - 从文件中的随机点开始,您无法确定下一条记录的开始位置。你可以通过从文件的开头读取来做到这一点,但是你并没有真正并行阅读。

If it's possible, reformatting it so that it's one record per line would let you use something like TextIO to read in the file.

如果可能的话,重新格式化它以使每行一条记录可以让你使用像TextIO这样的东西来读取文件。

If not, you'll need to read the file in one go.

如果没有,您需要一次性读取文件。

I would suggest a couple possible approaches:

我会建议几种可能的方法:

Write a ParDo that reads from the file using the gcs API

This is pretty straight forward. You'll do all the reading in one ParDo and you'll need to implement the connection code inside of that pardo. Inside the pardo you would write the same code you would as if you're reading the file in a normal java program. The pardo will emit each java object as a record.

这很简单。你将在一个ParDo中完成所有阅读,你需要在那个pardo中实现连接代码。在pardo中你会编写相同的代码,就像你在普通的java程序中读取文件一样。 pardo将每个java对象作为记录发出。

Implement a filebasedsource

File based sources will work - when the fileOrPatternSpec is "gs://..." it knows how to read from GCS. You'll need to make sure to set fileMetadata.isReadSeekEfficient to false so that it won't try to split the file. I haven't tried it, but I believe the correct way to do that is to set it inside of the single file constructor of FBS (ie, your class's override of FileBaseSource(MetaData, long, long)

基于文件的源将起作用 - 当fileOrPatternSpec是“gs:// ...”时,它知道如何从GCS读取。您需要确保将fileMetadata.isReadSeekEfficient设置为false,以便它不会尝试拆分文件。我没有尝试过,但我相信正确的方法是将其设置在FBS的单个文件构造函数中(即,您的类重写FileBaseSource(MetaData,long,long)

TextSource/XmlSource (and their accompanying wrappers TextIO/XmlIO) are examples of this, except that they try to implement splitting - yours will be much simpler since it won't.

TextSource / XmlSource(以及它们随附的包装器TextIO / XmlIO)就是这样的例子,除了它们试图实现拆分 - 你的它会更简单,因为它不会。