Reputation: 11
I am trying to figure out the performance difference between Map and ParDo, but I cannot run the ParDo method somehow
I have already tried finding some resources that try to solve the problem but I did not find one
ParDo Method (This does not work):
class ci(beam.DoFn):
def compute_interest(self,data_item):
cust_id, cust_data = data_item
if(cust_data['basic'][0]['acc_opened_date']=='2010-10-10'):
new_data = {}
new_data['new_balance'] = (cust_data['account'][0]['cur_bal'] * cust_data['account'][0]['roi']) / 100
new_data.update(cust_data['account'][0])
new_data.update(cust_data['basic'][0])
del new_data['cur_bal']
return new_data
Map Method (This works):
def compute_interest(data_item):
cust_id, cust_data = data_item
if(cust_data['basic'][0]['acc_opened_date']=='2010-10-10'):
new_data = {}
new_data['new_balance'] = (cust_data['account'][0]['cur_bal'] * cust_data['account'][0]['roi']) / 100
new_data.update(cust_data['account'][0])
new_data.update(cust_data['basic'][0])
del new_data['cur_bal']
return new_data
ERROR:
raise NotImplementedError RuntimeError: NotImplementedError [while running 'PIPELINE NAME']
Upvotes: 1
Views: 749
Reputation: 7058
Beam.DoFn
expects a process
method instead:
def process(self, element):
As explained in section 4.2.1.2 of the Beam programming guide:
Inside your DoFn subclass, you’ll write a method process where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your process method should accept an object of type element. This is the input element and output is emitted by using yield or return statement inside process method.
As an example we'll define both Map
and ParDo
functions:
def compute_interest_map(data_item):
return data_item + 1
class compute_interest_pardo(beam.DoFn):
def process(self, element):
yield element + 2
If you change process
for another method name you'll get the NotImplementedError
.
And the main pipeline will be:
events = (p
| 'Create' >> beam.Create([1, 2, 3]) \
| 'Add 1' >> beam.Map(lambda x: compute_interest_map(x)) \
| 'Add 2' >> beam.ParDo(compute_interest_pardo()) \
| 'Print' >> beam.ParDo(log_results()))
Output:
INFO:root:>> Interest: 4
INFO:root:>> Interest: 5
INFO:root:>> Interest: 6
Upvotes: 7