rish0097
rish0097

Reputation: 1094

Check if PCollection is empty - Apache Beam

Is there any way to check if a PCollection is empty?

I haven't found anything relevant in the documentation of Dataflow and Apache Beam.

Upvotes: 1

Views: 6164

Answers (2)

Ozgun Alan
Ozgun Alan

Reputation: 320

There is no way to check size of the PCollection without applying a PTransform on it (such as Count.globally() or Combine.combineFn()) because PCollection is not like a typical Collection in Java SDK or so.

It is an abstraction of bounded or unbounded collection of data where data is fed into the collection for an operation being applied on it (e.g. PTransform). Also it is parallelized (as the P at the beginning of the class suggest).

Therefore you need a mechanism to get counts of elements from each worker/node and combine them to get a value. Whether it is 0 or n can not be known until the end of that transformation.

Upvotes: 2

Marcin Zablocki
Marcin Zablocki

Reputation: 10683

You didn't specify which SDK you're using, so I assumed Python. The code is easily portable to Java.

You can apply global counting of elements and then map numeric value to boolean by applying simple comparison. You will be able to side-input this value using pvalue.AsSingleton function, like this:

import apache_beam as beam
from apache_beam import pvalue

is_empty_check = (your_pcollection
                    | "Count" >> beam.combiners.Count.Globally()
                    | "Is empty?" >> beam.Map(lambda n: n == 0)
                    )

another_pipeline_branch = (
    p
    | beam.Map(do_something, is_empty=pvalue.AsSingleton(is_empty_check))
)

Usage of the side input is the following:

def do_something(element, is_empty):
    if is_empty:
        # yes
    else:
        # no

Upvotes: 6

Related Questions