Norio Akagi
Norio Akagi

Reputation: 725

Apache Beam - Clarifying an expected behavior of output typehint on Python SDK

I'm trying to understand Apache Beam Python SDK internal and currently reading a typecheck part. I wrote a very simple pipeline as below:

class AddZeroFn(beam.DoFn):
  def process(self, element):
    return [element + '0']

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  with beam.Pipeline(options=pipeline_options) as p:
    numbers = p | beam.Create(['1', '2', '3'])
    numbers = numbers | beam.ParDo(AddZeroFn())
    numbers | 'Write' >> WriteToText('result.txt')

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Then a result is

10
20
30

Okey, then I added a type check for input, like

numbers = numbers | beam.ParDo(AddZeroFn().with_input_types(str))

This is fine, and it raises an error if I change str to int as expected.

apache_beam.typehints.decorators.TypeCheckError:
Type hint violation for 'ParDo(AddZeroFn)':
requires <type 'int'> but got <type 'str'> for element

However, when I added output type check like

numbers = numbers | beam.ParDo(AddZeroFn().with_output_types(float))

It just ran without any issue. No error is raised while I thought I would see the same error as input typehint. Am I misunderstanding the usage of output typehint? If so, could I ask how with_output_type is expected to behave?

Also ptransform.type_check_inputs_or_outputs has lines as below

  if pvalue_.element_type is None:
    # TODO(robertwb): It's a bug that we ever get here. (typecheck)
    continue
  if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
    at_context = ' %s %s' % (input_or_output, context) if context else ''
    raise TypeCheckError(
        '%s type hint violation at %s%s: expected %s, got %s' % (
            input_or_output.title(), self.label, at_context, hint,
            pvalue_.element_type))

However if I set some print statement in the first if block, I see that in many cases a program enters the block which means type check is skipped. I'd be appreciate if anyone can help me to understand what is the current correct behavior regarding typehint.

Apache Beam's version is 2.2.0. (I also tested with 2.3.0dev0)

Added (2017-12-27): I've been testing by using DirectRunner, but changed to DataflowRunner and now see a following error. Is this what we expect to see when we set with_output_types ? When I set with_input_types(int), it fails before sending a job to Dataflow, so I thought the same thing'd happen on output types as well.

(7b12756b863da949): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn, None, original_traceback
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 84, in apache_beam.runners.worker.operations.ConsumerSet.receive
    self.update_counters_start(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
    self.opcounter.update_from(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 63, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
    self.do_sample(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 81, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
    self.coder_impl.get_estimated_size_and_observables(windowed_value))
  File "apache_beam/coders/coder_impl.py", line 730, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    def get_estimated_size_and_observables(self, value, nested=False):
  File "apache_beam/coders/coder_impl.py", line 739, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    self._value_coder.get_estimated_size_and_observables(
  File "apache_beam/coders/coder_impl.py", line 99, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
    return self.estimate_size(value, nested), []
  File "apache_beam/coders/coder_impl.py", line 442, in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size
    return get_varint_size(value)
  File "apache_beam/coders/stream.pyx", line 222, in apache_beam.coders.stream.get_varint_size
    cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
TypeError: an integer is required [while running 'ParDo(AddZeroFn)']

Upvotes: 1

Views: 2905

Answers (1)

robertwb
robertwb

Reputation: 5104

Specified output types are only used to ensure agreement with subsequent transforms. For example if you wrote

numbers2 = numbers | beam.ParDo(AddZeroFn().with_output_types(float)) 
numbers2 | beam.ParDo(...).with_input_types(str)

you would get an error.

Upvotes: 2

Related Questions