Reputation: 903
I am creating a google data flow pipeline using Apache beam 2.x & Python
Basically I have a text file with every new line containing a english sentence.
I am trying to call Google NLP (Sentiments) API for every new line / sentence.
So I have a function which calls the NLP API :
class CalculateSentiments(beam.DoFn):
def process(self, element):
language_client = language.Client()
pre_text = re.sub('<[^>]*>', '', element)
text = re.sub(r'[^\w]', ' ', pre_text)
document = language_client.document_from_text(text)
sentiment = document.analyze_sentiment().sentiment
return sentiment.score
And I am using ParDo to call this function for every sentence. I am assuming, the following ParDo will call NLP sentiments api for every line from the text file automatically ( Basically I dont have to iterate through every line in the text file !?)
output = lines | beam.ParDo(CalculateSentiments())
output | WriteToText(known_args.output)
But I get this error after I execute the data flow:
TypeCheckError: FlatMap and ParDo must return an iterable. was returned instead. [while running 'ParDo(CalculateSentiments)'] Traceback (most recent call last):
File "/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", line 297, in call evaluator.process_element(value) File "/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 366, in process_element self.runner.process(element) File "/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 267, in process self.reraise_augmented(exn) File "/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 263, in process self._dofn_simple_invoker(element) File "/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/runners/common.py", line 198, in _dofn_simple_invoker self._process_outputs(element, self.dofn_process(element.value)) File "/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 60, in process return self.wrapper(self.dofn.process, args, kwargs) File "/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 84, in wrapper return self._check_type(result) File "/Users/gsattanthan/.local/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 98, in _check_type % type(output))
What Am I doing wrong ? The way I use Pardo is very similar as shown in Apache beam doco !
Any thoughts ?
Upvotes: 1
Views: 1404
Reputation: 903
Actually, wrapping the return variable with square brackets fixed it.
Now it makes sense, why the error says it needs to be iterable !
return [sentiment.score]
Upvotes: 1