Reputation: 23
I am working with Apache Beam on Google Dataflow and I'm calling a function sentiment through a lambda function and I get an error the the function name is not defined.
output_tweets = (lines
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
| 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
| 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
)
This was my Apache Beam call and in the last line function sentiment is mentioned which is giving me the problem.
The function code is as follows (which I don't think should matter):
def sentiment(messages):
if not isinstance(messages, list):
messages = [messages]
instances = list(map(lambda message: json.loads(message), messages))
lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
for instance in instances['text']:
response = lservice.documents().analyzeSentiment(
body ={
'document': {
'type': 'PLAIN_TEXT',
'content': instance
}
}
).execute()
instance['polarity'] = response['documentSentiment']['polarity']
instance['magnitude'] = response['documentSentiment']['magnitude']
return instances
I get the following traceback
File "stream.py", line 97, in <lambda>
NameError: name 'sentiment' is not defined [while running 'generatedPtransform-441']
Any idea?
Upvotes: 2
Views: 1805
Reputation: 2825
This issue can happen due to couple of reasons
sentiment
definition present in the same python file as the beam pipeline.sentiment
is before it it is called in the beam pipeline?I did a quick test as below and if the both the above are followed it works as desired
def testing(messages):
return messages.lower()
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: testing(word))
| "count" >> beam.combiners.Count.PerElement())
ib.show(windowed_lower_word_counts, include_window_info=True)
0 b'have' 3 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
1 b'ransom' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
2 b'let' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
3 b'me' 1 2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s) Pane 0
If the function is defined after it is called then we get the error as shown below
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: testing_after(word))
| "count" >> beam.combiners.Count.PerElement())
ib.show(windowed_lower_word_counts, include_window_info=True)
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f478f344820>, due to an exception.
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 954, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 552, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/core.py", line 1482, in <lambda>
wrapper = lambda x: [fn(x)]
File "<ipython-input-19-f34e29a17836>", line 2, in <lambda>
| beam.Map(lambda word: testing_after_new(word))
NameError: name 'testing_after' is not defined
def testing_after(messages):
return messages.lower()
Instead of passing the function as beam.FlatMap(lambda x : fn(x))
pass the function as beam.FlatMap(x)
I believe in the first case beam tries to look for fn in the worker machines and it is not able to find it. The implementation details can be found here - https://github.com/apache/beam/blob/fa4f4183a315f061e035d38ba2c5d4b837b371e0/sdks/python/apache_beam/transforms/core.py#L780
Upvotes: 5