Reputation: 103
I would like to obtain row schemas in Apache Beam (Python) for use with SQL transforms. However, I ran into the issue explained below.
The schema is defined as follows:
class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
coders.registry.register_coder(RowSchema, coders.RowCoder)
The following example infers the schema correctly:
with beam.Pipeline(options=pipeline_options) as p:
pcol = (p
| "Create" >> beam.Create(
[
RowSchema(colA='a1', colB='b1'),
RowSchema(colA='a2', colB=None)])
.with_output_types(RowSchema)
| beam.Map(print)
)
The following attempt, however, raises "ValueError: Type names and field names must be valid identifiers: 'run.<locals>.RowSchema'"
with beam.Pipeline(options=pipeline_options) as p:
pcol = (p
| "Create" >> beam.Create(
[
{'colA': 'a1', 'colB': 'b1'},
{'colA': 'a2', 'colB': None}])
| 'ToRow' >> beam.Map(
lambda x: RowSchema(**x)) \
.with_output_types(RowSchema)
| beam.Map(print)
)
Full stack trace:
Traceback (most recent call last):
File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "home/src/main.py", line 326, in <module>
run()
File "home/src/main.py", line 267, in run
| 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1661, in Map
pardo = FlatMap(wrapper, *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1606, in FlatMap
pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1217, in __init__
super().__init__(fn, *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py", line 861, in __init__
self.fn = pickler.loads(pickler.dumps(self.fn))
File "home/lib/python3.9/site-packages/apache_beam/internal/pickler.py", line 51, in loads
return desired_pickle_lib.loads(
File "home/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py", line 289, in loads
return dill.loads(s)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "home/lib/python3.9/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 788, in _create_namedtuple
t = collections.namedtuple(name, fieldnames)
File "/usr/lib/python3.9/collections/__init__.py", line 390, in namedtuple
raise ValueError('Type names and field names must be valid '
ValueError: Type names and field names must be valid identifiers: 'run.<locals>.RowSchema'
The failed attempt works if I change the schema definition to
RowSchema = typing.NamedTuple('RowSchema', [('colA', str), ('colB', typing.Optional[str])])
The error snippet seems to be correctly formatted according to some of the references below.
References:
Tested on Python 3.9, Beam 2.37.0, and multiple runners including DirectRunner, DataflowRunner and PortableRunner.
Upvotes: 1
Views: 1523
Reputation: 103
Solved it by simply moving the schema definition outside the run function.
class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
coders.registry.register_coder(RowSchema, coders.RowCoder)
def run(argv=None, save_main_session=True):
...
with beam.Pipeline(options=pipeline_options) as p:
...
Upvotes: 2