Dipak Monty
Dipak Monty

Reputation: 31

How to exit from doFn lyfecycle in apacheBeam,when a condition meets?

I am going through lyfecycle management of Apache Beam DoFn. I know,how to write Dofn using lifecycle methods. But, I want to know,whether we can stop processing elements,if a certain condition meets.

Below,in code, In class I have a temp list,in which I have 1 as element. When ,Bundle starts,I am checking,if 1 exists in temp,which is true,then I am calling finishBundle method. What I was expecting that,when calling finish bundle,it will stop processing the elements. But,it executes finishBundle method,and again start processing elemens.

  def __init__(self):
    self.temp=[1]

  def process(self, element):
    print("Processing element: %s" % element)
    yield element.upper()

  def start_bundle(self):
    if 1 in  self.temp :
      print("1 in bundle")
      self.finish_bundle()
    else:
      print("Bundle started")

  def finish_bundle(self):
    print("Bundle finished")

  def setup(self):
    dt_string = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    print("Worker started %s" % dt_string)

  def teardown(self):
    dt_string = datetime.now().strftime("%d/%m/%Y %H:%M:%S")    
    print("Worker finished %s" % dt_string)

Here is my output:

changed = sane | beam.ParDo(ChangeWordDoFn()) \
              | beam.Map(print)

ib.show(changed)

Worker started 27/08/2022 09:05:09

1 in bundle

Bundle finished

Processing element: en

EN

Processing element: un

UN

. . .

Upvotes: 0

Views: 181

Answers (1)

robertwb
robertwb

Reputation: 5104

Methods such as finish_bundle are callbacks that are invoked when the bundle finishes, not triggers to invoke finishing of the bundle.

What you can do instead is record information on the DoFn instance to make subsequent process calls no-ops, e.g.

class MyDoFn(beam.DoFn):
  def __init__(self):
    self.temp=[1]

  def process(self, element):
    if not self.done:
        print("Processing element: %s" % element)
        yield element.upper()
        if [bundle-finishing-condition]:
            self.done = True

  def start_bundle(self):
    self.done = False
    print("Bundle started")

  def finish_bundle(self):
    print("Bundle finished")

Upvotes: 0

Related Questions