Reputation: 725
I'm trying to understand Apache Beam Python SDK internal and currently reading a typecheck part. I wrote a very simple pipeline as below:
class AddZeroFn(beam.DoFn):
def process(self, element):
return [element + '0']
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
numbers = p | beam.Create(['1', '2', '3'])
numbers = numbers | beam.ParDo(AddZeroFn())
numbers | 'Write' >> WriteToText('result.txt')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Then a result is
10
20
30
Okey, then I added a type check for input, like
numbers = numbers | beam.ParDo(AddZeroFn().with_input_types(str))
This is fine, and it raises an error if I change str
to int
as expected.
apache_beam.typehints.decorators.TypeCheckError:
Type hint violation for 'ParDo(AddZeroFn)':
requires <type 'int'> but got <type 'str'> for element
However, when I added output type check like
numbers = numbers | beam.ParDo(AddZeroFn().with_output_types(float))
It just ran without any issue. No error is raised while I thought I would see the same error as input typehint. Am I misunderstanding the usage of output typehint? If so, could I ask how with_output_type
is expected to behave?
Also ptransform.type_check_inputs_or_outputs
has lines as below
if pvalue_.element_type is None:
# TODO(robertwb): It's a bug that we ever get here. (typecheck)
continue
if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
at_context = ' %s %s' % (input_or_output, context) if context else ''
raise TypeCheckError(
'%s type hint violation at %s%s: expected %s, got %s' % (
input_or_output.title(), self.label, at_context, hint,
pvalue_.element_type))
However if I set some print statement in the first if block, I see that in many cases a program enters the block which means type check is skipped. I'd be appreciate if anyone can help me to understand what is the current correct behavior regarding typehint.
Apache Beam's version is 2.2.0. (I also tested with 2.3.0dev0)
Added (2017-12-27):
I've been testing by using DirectRunner, but changed to DataflowRunner and now see a following error. Is this what we expect to see when we set with_output_types
? When I set with_input_types(int)
, it fails before sending a job to Dataflow, so I thought the same thing'd happen on output types as well.
(7b12756b863da949): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn, None, original_traceback
File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_processor.process_outputs(
File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 84, in apache_beam.runners.worker.operations.ConsumerSet.receive
self.update_counters_start(windowed_value)
File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
self.opcounter.update_from(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 63, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
self.do_sample(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 81, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "apache_beam/coders/coder_impl.py", line 730, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
def get_estimated_size_and_observables(self, value, nested=False):
File "apache_beam/coders/coder_impl.py", line 739, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
self._value_coder.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 99, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
return self.estimate_size(value, nested), []
File "apache_beam/coders/coder_impl.py", line 442, in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size
return get_varint_size(value)
File "apache_beam/coders/stream.pyx", line 222, in apache_beam.coders.stream.get_varint_size
cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
TypeError: an integer is required [while running 'ParDo(AddZeroFn)']
Upvotes: 1
Views: 2905
Reputation: 5104
Specified output types are only used to ensure agreement with subsequent transforms. For example if you wrote
numbers2 = numbers | beam.ParDo(AddZeroFn().with_output_types(float))
numbers2 | beam.ParDo(...).with_input_types(str)
you would get an error.
Upvotes: 2