Reputation: 31
I am going through lyfecycle management of Apache Beam DoFn. I know,how to write Dofn using lifecycle methods. But, I want to know,whether we can stop processing elements,if a certain condition meets.
Below,in code, In class I have a temp list,in which I have 1 as element. When ,Bundle starts,I am checking,if 1 exists in temp,which is true,then I am calling finishBundle method. What I was expecting that,when calling finish bundle,it will stop processing the elements. But,it executes finishBundle method,and again start processing elemens.
def __init__(self):
self.temp=[1]
def process(self, element):
print("Processing element: %s" % element)
yield element.upper()
def start_bundle(self):
if 1 in self.temp :
print("1 in bundle")
self.finish_bundle()
else:
print("Bundle started")
def finish_bundle(self):
print("Bundle finished")
def setup(self):
dt_string = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
print("Worker started %s" % dt_string)
def teardown(self):
dt_string = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
print("Worker finished %s" % dt_string)
Here is my output:
changed = sane | beam.ParDo(ChangeWordDoFn()) \
| beam.Map(print)
ib.show(changed)
Worker started 27/08/2022 09:05:09
1 in bundle
Bundle finished
Processing element: en
EN
Processing element: un
UN
. . .
Upvotes: 0
Views: 181
Reputation: 5104
Methods such as finish_bundle
are callbacks that are invoked when the bundle finishes, not triggers to invoke finishing of the bundle.
What you can do instead is record information on the DoFn instance to make subsequent process calls no-ops, e.g.
class MyDoFn(beam.DoFn):
def __init__(self):
self.temp=[1]
def process(self, element):
if not self.done:
print("Processing element: %s" % element)
yield element.upper()
if [bundle-finishing-condition]:
self.done = True
def start_bundle(self):
self.done = False
print("Bundle started")
def finish_bundle(self):
print("Bundle finished")
Upvotes: 0