Reputation: 485
I have a pipeline using the Python SDK 2.2.0 for Apache Beam.
This pipeline is almost a typical word count: I have pairs of names in the format ("John Doe, Jane Smith", 1)
, and I'm trying to figure out how many times each pair of names appears together, like this:
p_collection
| "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
| "GroupByKey" >> beam.GroupByKey()
| "AggregateGroups" >> beam.Map(lambda (pair, ones): (pair, sum(ones)))
| "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})
When I run this code locally, with a small dataset, it works perfectly.
But when I deploy it to Google Cloud DataFlow, I get the following error:
An exception was raised when trying to execute the workitem 423109085466017585 : Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work work_executor.execute() File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute op.start() File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start def start(self): File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start with self.scoped_start_state: File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start with self.shuffle_source.reader() as reader: File "dataflow_worker/shuffle_operations.py", line 69, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start self.output(windowed_value) File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "dataflow_worker/shuffle_operations.py", line 233, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process self.output(wvalue.with_value((k, wvalue.value))) File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs( File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise new_exn, None, original_traceback File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs( File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 84, in apache_beam.runners.worker.operations.ConsumerSet.receive self.update_counters_start(windowed_value) File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start self.opcounter.update_from(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 63, in apache_beam.runners.worker.opcounters.OperationCounters.update_from self.do_sample(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 81, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample self.coder_impl.get_estimated_size_and_observables(windowed_value)) File "apache_beam/coders/coder_impl.py", line 730, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables def get_estimated_size_and_observables(self, value, nested=False): File "apache_beam/coders/coder_impl.py", line 739, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables self._value_coder.get_estimated_size_and_observables( File "apache_beam/coders/coder_impl.py", line 518, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables values[i], nested=nested or i + 1 < len(self._coder_impls))) RuntimeError: KeyError: 0 [while running 'Transform/Format']
Looking at the source code of where this error pops up from, I thought it could be cause due to the fact that some of the names contain some weird encoded characters, so in a desperate act I tried using the .encode("ascii", errors="ignore").decode()
you see on the code, but no luck.
Any ideas as to why this pipeline executes successfully locally, but fails on DataFlow runner?
Thanks!
Upvotes: 3
Views: 4141
Reputation: 385
GroupByKey groups all elements with the same key and produces multiple PCollections. The next stage receives an Iterable collecting all elements with the same key. The important note is that this Iterable is evaluated lazily, at least when GroupByKey is executed on the Dataflow runner. This means that elements are loaded into memory on demand — when requested from the iterator.
CombinePerKey on the other hand, also groups all elements with the same key, but does an aggregation before emitting a single value.
pcollection_obj
| "MapWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
| "GroupByKeyAndSum" >> beam.CombinePerKey(sum)
| "CreateDictionary" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})
@Hannon César I hope this answers your question. Cheers !!
Upvotes: 0
Reputation: 547
In some cases, like my own, you need the intermediate grouped values, so CombinePerKey
isn't ideal. In this more general case, you can replace GroupByKey()
with CombineValues(ToListCombineFn())
.
I'm not confident as to why this works while GroupByKey
doesn't. My guess is that consuming the _UnwindowedValues
iterable returned by GroupByKey
like a list fails in a parallel execution environment. I was doing something like:
... | beam.GroupByKey()
| beam.Map(lambda k_v: (k_v[0], foo(list(k_v[1]))))
| ...
where foo
requires the full, indexable list and is not easily composable. I'm not sure why this sort of restriction would have caused issues for you, though; sum
can operate on an iterable.
This solution isn't ideal in that (I believe) you lose some parallelization with the ToList
conversion. That being said, at least it's an option if anybody else faces this same issue!
Upvotes: 1
Reputation: 485
This isn't so much of a fix to my problem as it is avoiding the problem in the first place, but it did make my code run, thanks to the suggestion of user1093967 in the comments.
I just replaced the GroupByKey
and the AggregateGroups
by a CombinePerKey(sum)
step and the problem didn't occur anymore.
p_collection
| "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
| "GroupAndSum" >> beam.CombinePerKey(sum)
| "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})
I'd be happy to hear why it works, though.
Upvotes: 4