Nivaldo T
Nivaldo T

Reputation: 103

Defining a Python Apache Beam schema from dictionary rows

I would like to obtain row schemas in Apache Beam (Python) for use with SQL transforms. However, I ran into the issue explained below.

The schema is defined as follows:

class RowSchema(typing.NamedTuple):
  colA: str
  colB: typing.Optional[str]

coders.registry.register_coder(RowSchema, coders.RowCoder)

The following example infers the schema correctly:

  with beam.Pipeline(options=pipeline_options) as p:
    
    pcol = (p
    | "Create" >> beam.Create(
        [
          RowSchema(colA='a1', colB='b1'),
          RowSchema(colA='a2', colB=None)])
          .with_output_types(RowSchema)
    | beam.Map(print)
    )

The following attempt, however, raises "ValueError: Type names and field names must be valid identifiers: 'run.<locals>.RowSchema'"

  with beam.Pipeline(options=pipeline_options) as p:

    pcol = (p
    | "Create" >> beam.Create(
        [
            {'colA': 'a1', 'colB': 'b1'},
            {'colA': 'a2', 'colB': None}])
    | 'ToRow' >> beam.Map(
        lambda x: RowSchema(**x)) \
          .with_output_types(RowSchema)
    | beam.Map(print)
    )

Full stack trace:

Traceback (most recent call last):


File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "home/src/main.py", line 326, in <module>
    run()
  File "home/src/main.py", line 267, in run
    | 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
  File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1661, in Map
    pardo = FlatMap(wrapper, *args, **kwargs)
  File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1606, in FlatMap
    pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
  File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1217, in __init__
    super().__init__(fn, *args, **kwargs)
  File "home/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py", line 861, in __init__
    self.fn = pickler.loads(pickler.dumps(self.fn))
  File "home/lib/python3.9/site-packages/apache_beam/internal/pickler.py", line 51, in loads
    return desired_pickle_lib.loads(
  File "home/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py", line 289, in loads
    return dill.loads(s)
  File "home/lib/python3.9/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "home/lib/python3.9/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "home/lib/python3.9/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "home/lib/python3.9/site-packages/dill/_dill.py", line 788, in _create_namedtuple
    t = collections.namedtuple(name, fieldnames)
  File "/usr/lib/python3.9/collections/__init__.py", line 390, in namedtuple
    raise ValueError('Type names and field names must be valid '
ValueError: Type names and field names must be valid identifiers: 'run.<locals>.RowSchema'

The failed attempt works if I change the schema definition to

RowSchema = typing.NamedTuple('RowSchema', [('colA', str), ('colB', typing.Optional[str])])

The error snippet seems to be correctly formatted according to some of the references below.

References:

Tested on Python 3.9, Beam 2.37.0, and multiple runners including DirectRunner, DataflowRunner and PortableRunner.

Upvotes: 1

Views: 1523

Answers (1)

Nivaldo T
Nivaldo T

Reputation: 103

Solved it by simply moving the schema definition outside the run function.

class RowSchema(typing.NamedTuple):
    colA: str
    colB: typing.Optional[str]

coders.registry.register_coder(RowSchema, coders.RowCoder)

def run(argv=None, save_main_session=True):
    ...
    with beam.Pipeline(options=pipeline_options) as p:
        ...

Upvotes: 2

Related Questions