检查PCollection是否为空--Apache Beam

时间:2021-12-01 14:18:45

Is there any way to check if a PCollection is empty?

有没有办法检查PCollection是否为空?

I haven't found anything relevant in the documentation of Dataflow and Apache Beam.

我没有在Dataflow和Apache Beam的文档中找到任何相关内容。

2 个解决方案

#1


1  

There is no way to check size of the PCollection without applying a PTransform on it (such as Count.globally() or Combine.combineFn()) because PCollection is not like a typical Collection in Java SDK or so.

如果不在其上应用PTransform(例如Count.globally()或Combine.combineFn()),则无法检查PCollection的大小,因为PCollection不像Java SDK中的典型Collection那样。

It is an abstraction of bounded or unbounded collection of data where data is fed into the collection for an operation being applied on it (e.g. PTransform). Also it is parallelized (as the P at the beginning of the class suggest).

它是有界或*数据集合的抽象,其中数据被馈送到集合中以对其应用的操作(例如,PTransform)。它也是并行化的(正如班级开头的P所示)。

Therefore you need a mechanism to get counts of elements from each worker/node and combine them to get a value. Whether it is 0 or n can not be known until the end of that transformation.

因此,您需要一种机制来获取每个工作者/节点的元素计数,并将它们组合起来以获取值。无论是0还是n,在转换结束之前都无法知道。

#2


1  

You didn't specify which SDK you're using, so I assumed Python. The code is easily portable to Java.

你没有指定你正在使用哪个SDK,所以我假设是Python。代码很容易移植到Java。

You can apply global counting of elements and then map numeric value to boolean by applying simple comparison. You will be able to side-input this value using pvalue.AsSingleton function, like this:

您可以应用元素的全局计数,然后通过应用简单比较将数值映射到布尔值。您将能够使用pvalue.AsSingleton函数侧输入此值,如下所示:

import apache_beam as beam
from apache_beam import pvalue

is_empty_check = (your_pcollection
                    | "Count" >> beam.combiners.Count.Globally()
                    | "Is empty?" >> beam.Map(lambda n: n == 0)
                    )

another_pipeline_branch = (
    p
    | beam.Map(do_something, is_empty=pvalue.AsSingleton(is_empty_check))
)

Usage of the side input is the following:

侧输入的用法如下:

def do_something(element, is_empty):
    if is_empty:
        # yes
    else:
        # no

#1


1  

There is no way to check size of the PCollection without applying a PTransform on it (such as Count.globally() or Combine.combineFn()) because PCollection is not like a typical Collection in Java SDK or so.

如果不在其上应用PTransform(例如Count.globally()或Combine.combineFn()),则无法检查PCollection的大小,因为PCollection不像Java SDK中的典型Collection那样。

It is an abstraction of bounded or unbounded collection of data where data is fed into the collection for an operation being applied on it (e.g. PTransform). Also it is parallelized (as the P at the beginning of the class suggest).

它是有界或*数据集合的抽象,其中数据被馈送到集合中以对其应用的操作(例如,PTransform)。它也是并行化的(正如班级开头的P所示)。

Therefore you need a mechanism to get counts of elements from each worker/node and combine them to get a value. Whether it is 0 or n can not be known until the end of that transformation.

因此,您需要一种机制来获取每个工作者/节点的元素计数,并将它们组合起来以获取值。无论是0还是n,在转换结束之前都无法知道。

#2


1  

You didn't specify which SDK you're using, so I assumed Python. The code is easily portable to Java.

你没有指定你正在使用哪个SDK,所以我假设是Python。代码很容易移植到Java。

You can apply global counting of elements and then map numeric value to boolean by applying simple comparison. You will be able to side-input this value using pvalue.AsSingleton function, like this:

您可以应用元素的全局计数,然后通过应用简单比较将数值映射到布尔值。您将能够使用pvalue.AsSingleton函数侧输入此值,如下所示:

import apache_beam as beam
from apache_beam import pvalue

is_empty_check = (your_pcollection
                    | "Count" >> beam.combiners.Count.Globally()
                    | "Is empty?" >> beam.Map(lambda n: n == 0)
                    )

another_pipeline_branch = (
    p
    | beam.Map(do_something, is_empty=pvalue.AsSingleton(is_empty_check))
)

Usage of the side input is the following:

侧输入的用法如下:

def do_something(element, is_empty):
    if is_empty:
        # yes
    else:
        # no