Reputation: 964
I'm running a beam pipeline in google dataflow. For a long time it used to work good with no issues. Now, and without changing anything in code, when I run it, I submit it to google dataflow, I get this error:
wrapper = lambda x: [fn(x)]
File "read_from_gcs.py", line 391, in <lambda>
NameError: name 'beam' is not defined [while running 'Covert to Row-ptransform-2054']
I already imported apachea_beam as beam (as I use it to call all transformations) The error is pointing to the following code:
schema_for_dedup = (
distinct_without_chain | 'Filter nulls' >> beam.Filter(lambda r: r['key1'] != None and r['key2'] != None and r['key3'] != None)
| 'Covert to Row' >> beam.Map(lambda val: beam.Row(
k= val['k'],
k2= val['k2'],
))
)
The weird thing is when I run the job locally, it runs successfully with no issues. But when I submit it to the dataflow with DataflowRunner it shows these errors. I'm using apache-beam 2.32.0
Does google changed something in the infrastructure of dataflow these days? How can it be working few days before, and working good on my local computer but producing this error now?
Have anyone faced this issue before?
Upvotes: 1
Views: 1586
Reputation: 5104
Is this the __main__
module? That could be the reason beam
is not defined in your lambda when running on a remote machine. You could try passing the --save_main_session
pipeline option or put all your code in another module that you import in your main module or put your imports inside a function e.g.
def run():
import apache_beam as beam
[build your pipline as normal]
As for the second error, europe-west4 vs. europe-west3 shouldn't matter, but TypeError: '_UnwindowedValues'
indicates that you're trying to use [index]
with the result of a GroupBykey which is generally only an iterable, not a list (as it may not fit into memory). Instead of
def process_gbk_results(key, values):
# do something with values[0]
input | beam.GroupByKey() | beam.MapTuple(process_gbk_results)
do something like
def process_gbk_results(key, values):
for value in values:
# do something with value
or
def process_gbk_results(key, values):
# do something with next(iter(values))
or
def process_gbk_results(key, values):
values = list(values)
# do something with values[0]
Upvotes: 2