使用pcollection作为另一个pcollection的输入

时间:2022-06-15 14:07:22

Using python sdk in google dataflow, I would like to do a query like this:

在谷歌数据流中使用python sdk,我想做一个这样的查询:

query_a_and_b = "SELECT a, b FROM TableA"

query_a_and_b =“SELECT a,b FROM TableA”

This query returns a list of tuples i'd like to use to perform more queries:

此查询返回我想用于执行更多查询的元组列表:

query_param = SELECT * from TableA WHERE a = {} and b = {}.format(a, b) (here i set TableA but it will also be used with TableB, C and D that are inner joined with TableA...)

query_param = SELECT *来自TableA WHERE a = {}和b = {} .format(a,b)(这里我设置TableA,但它也将与TableB,C和D一起使用,它们与TableA内部连接...)

So what I am trying to do:

所以我想做的是:

coll = (p
    | 'read a_b_tuples' >> beam.io.Read(beam.io.BigQuerySource(query=query_a_and_b, use_standard_sql=True)) 
    | 'Build SQL' >> beam.Map(lambda x: query_param.format(x['a'], x['b'])) 
    | 'Query pardo' >> beam.ParDo(lambda q: [beam.io.Read(beam.io.BigQuerySource(query=q, use_standard_sql=True))])
    | 'Save' >> beam.io.WriteToText('results.csv')
)

I am not sure that the best approach and it does not work. What is the preferred way to achieve this in dataflow?

我不确定最好的方法,但它不起作用。在数据流中实现此目的的首选方法是什么?

Ultimately, each of these queries will return a small amount of rows (less than 5k), that i'd like to load in a pandas dataframe for filtering/processing, then combine all TableA,B,C,D for every tuple (a,b) and write each tuple datafarm to a csv file the result.

最终,这些查询中的每一个都将返回少量行(小于5k),我想加载到pandas数据帧中进行过滤/处理,然后将所有TableA,B,C,D组合为每个元组(a ,b)并将每个元组datafarm写入csv文件的结果。

I might be map-reducing the problem incorrectly in a sense I could use the beam functions to group by a and b and then do my processing...?

我可能是地图 - 在某种意义上错误地减少了问题我可以使用梁函数按a和b进行分组然后进行处理......?

1 个解决方案

#1


4  

Beam doesn't directly support this for BigQuery yet. Some other transforms support similar use cases, e.g. JdbcIO.readAll() can query a database for a collection of query parameters, TextIO.readAll() can read a collection of filenames - but BigQueryIO doesn't do this yet, neither in the Java nor Python SDKs.

Beam还没有直接支持BigQuery。一些其他变换支持类似的用例,例如JdbcIO.readAll()可以在数据库中查询查询参数的集合,TextIO.readAll()可以读取文件名的集合 - 但是BigQueryIO在Java和Python SDK中都没有这样做。

In your "Query pardo", you can instead explicitly talk to the BigQuery REST API - it should be fine because your queries return a small number of results.

在“查询pardo”中,您可以明确地与BigQuery REST API交谈 - 它应该没问题,因为您的查询会返回少量结果。

#1


4  

Beam doesn't directly support this for BigQuery yet. Some other transforms support similar use cases, e.g. JdbcIO.readAll() can query a database for a collection of query parameters, TextIO.readAll() can read a collection of filenames - but BigQueryIO doesn't do this yet, neither in the Java nor Python SDKs.

Beam还没有直接支持BigQuery。一些其他变换支持类似的用例,例如JdbcIO.readAll()可以在数据库中查询查询参数的集合,TextIO.readAll()可以读取文件名的集合 - 但是BigQueryIO在Java和Python SDK中都没有这样做。

In your "Query pardo", you can instead explicitly talk to the BigQuery REST API - it should be fine because your queries return a small number of results.

在“查询pardo”中,您可以明确地与BigQuery REST API交谈 - 它应该没问题,因为您的查询会返回少量结果。