Ankit Dhall
Ankit Dhall

Reputation: 23

Dataflow - Function not being called - Error - name not defined

I am working with Apache Beam on Google Dataflow and I'm calling a function sentiment through a lambda function and I get an error the the function name is not defined.

output_tweets = (lines
                     | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
                     | 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
                     | 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
                     | 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
                     )

This was my Apache Beam call and in the last line function sentiment is mentioned which is giving me the problem.

The function code is as follows (which I don't think should matter):

def sentiment(messages):
    if not isinstance(messages, list):
        messages = [messages]

    instances = list(map(lambda message: json.loads(message), messages))
    lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
    for instance in instances['text']:
        response = lservice.documents().analyzeSentiment(
            body ={
                'document': {
                    'type': 'PLAIN_TEXT',
                    'content': instance
                }
            }
        ).execute()
        instance['polarity'] = response['documentSentiment']['polarity']
        instance['magnitude'] = response['documentSentiment']['magnitude']

    return instances

I get the following traceback

  File "stream.py", line 97, in <lambda>
NameError: name 'sentiment' is not defined [while running 'generatedPtransform-441']

Any idea?

Upvotes: 2

Views: 1805

Answers (1)

Jayadeep Jayaraman
Jayadeep Jayaraman

Reputation: 2825

This issue can happen due to couple of reasons

  1. Is the function sentiment definition present in the same python file as the beam pipeline.
  2. Is the definition of function sentiment is before it it is called in the beam pipeline?

I did a quick test as below and if the both the above are followed it works as desired

def testing(messages):
    return messages.lower()

windowed_lower_word_counts = (windowed_words
                              | beam.Map(lambda word: testing(word))
                              | "count" >> beam.combiners.Count.PerElement())

ib.show(windowed_lower_word_counts, include_window_info=True)

0   b'have'     3   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0
1   b'ransom'   1   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0
2   b'let'      1   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0
3   b'me'       1   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0

If the function is defined after it is called then we get the error as shown below

windowed_lower_word_counts = (windowed_words
                              | beam.Map(lambda word: testing_after(word))
                              | "count" >> beam.combiners.Count.PerElement())

ib.show(windowed_lower_word_counts, include_window_info=True)

ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f478f344820>, due to an exception.
 Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 954, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 552, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/core.py", line 1482, in <lambda>
    wrapper = lambda x: [fn(x)]
  File "<ipython-input-19-f34e29a17836>", line 2, in <lambda>
    | beam.Map(lambda word: testing_after_new(word))
NameError: name 'testing_after' is not defined

def testing_after(messages):
    return messages.lower()

UPDATE

Instead of passing the function as beam.FlatMap(lambda x : fn(x)) pass the function as beam.FlatMap(x)

I believe in the first case beam tries to look for fn in the worker machines and it is not able to find it. The implementation details can be found here - https://github.com/apache/beam/blob/fa4f4183a315f061e035d38ba2c5d4b837b371e0/sdks/python/apache_beam/transforms/core.py#L780

Upvotes: 5

Related Questions