Idhem
Idhem

Reputation: 964

NameError: name 'beam' is not defined in <lambda>

I'm running a beam pipeline in google dataflow. For a long time it used to work good with no issues. Now, and without changing anything in code, when I run it, I submit it to google dataflow, I get this error:

wrapper = lambda x: [fn(x)]
File "read_from_gcs.py", line 391, in <lambda>
NameError: name 'beam' is not defined [while running 'Covert to Row-ptransform-2054']

I already imported apachea_beam as beam (as I use it to call all transformations) The error is pointing to the following code:

schema_for_dedup = (
            distinct_without_chain | 'Filter nulls' >> beam.Filter(lambda r: r['key1'] != None and r['key2'] != None and r['key3'] != None)
            | 'Covert to Row' >> beam.Map(lambda val: beam.Row(
                             k= val['k'],
                             k2= val['k2'],
                             ))

        )

The weird thing is when I run the job locally, it runs successfully with no issues. But when I submit it to the dataflow with DataflowRunner it shows these errors. I'm using apache-beam 2.32.0

Does google changed something in the infrastructure of dataflow these days? How can it be working few days before, and working good on my local computer but producing this error now?

Have anyone faced this issue before?

Upvotes: 1

Views: 1586

Answers (1)

robertwb
robertwb

Reputation: 5104

Is this the __main__ module? That could be the reason beam is not defined in your lambda when running on a remote machine. You could try passing the --save_main_session pipeline option or put all your code in another module that you import in your main module or put your imports inside a function e.g.

def run():
  import apache_beam as beam
  [build your pipline as normal]

As for the second error, europe-west4 vs. europe-west3 shouldn't matter, but TypeError: '_UnwindowedValues' indicates that you're trying to use [index] with the result of a GroupBykey which is generally only an iterable, not a list (as it may not fit into memory). Instead of

def process_gbk_results(key, values):
  # do something with values[0]

input | beam.GroupByKey() | beam.MapTuple(process_gbk_results)

do something like

def process_gbk_results(key, values):
  for value in values:
    # do something with value

or

def process_gbk_results(key, values):
  # do something with next(iter(values))

or

def process_gbk_results(key, values):
  values = list(values)
  # do something with values[0]

Upvotes: 2

Related Questions