Reputation: 83
I'm working on an Apache Beam pipeline using an AI Platform Jupyter Notebook environment on Google Cloud Platform (Apache Beam 2.28.0 for Python 3). Walking through the code I had to puts hands on, I came to a transformation applied this way : new_pColl = pColl | "Process images" >> ProcessImages(*args)
where ProcessImages
is a class defined by:
import datetime
import json
import time
from ssl import SSLError
import apache_beam as beam
from apache_beam.transforms.ptransform import PTransform
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient import discovery
from googleapiclient.errors import HttpError
from dataflow_helpers import util
from operations.features import (base, face, image, label, logo, objects, safe_search,
text)
from operations.features.flatten import Flatten
from operations.vision.image import ExtractImageMetadata
from operations.performance.performance_collect_factory import PerformanceCollectFactory
class ProcessImages(PTransform):
def __init__(self, creative_source_type):
self.operations_map = {
'image_properties_annotation': image,
'text_annotations': text,
'safe_search_annotation': safe_search,
'label_annotations': label,
'logo_annotations': logo,
'face_annotations': face,
'object_annotations': objects,
}
self.creative_source_type = creative_source_type
def expand(self, pColl):
image_endpoints = ['image_properties_annotation', 'text_annotations',
'safe_search_annotation', 'label_annotations',
'logo_annotations', 'face_annotations', 'object_annotations']
annotated_creatives = (
pColl | 'Annotate image creatives' >> beam.ParDo(
ExtractImageMetadata())
| 'Extract Top Level Features' >> beam.ParDo(
base.Extract()))
features = {}
features['base'] = annotated_creatives | 'collect base features' >> beam.ParDo(
base.Collect())
for endpoint in image_endpoints:
endpoint_features = (annotated_creatives
| f'Filter {endpoint}' >> beam.ParDo(
util.FilterAPIOutput(), endpoint=endpoint)
| f"Extract {endpoint} features" >> beam.ParDo(
self.operations_map[endpoint].Extract())
| f"Collect {endpoint} by key" >> beam.ParDo(
self.operations_map[endpoint].Collect()))
features[endpoint] = endpoint_features
performance_DoFn = PerformanceCollectFactory(
).CreatePerformanceCollect
features['performance'] = pColl | 'extract performance' >> beam.ParDo(performance_DoFn,
self.creative_source_type)
return (features | 'Group features' >> beam.CoGroupByKey()
| 'Flatten features' >> beam.ParDo(Flatten()))
My point is that I'd like to refactor that piece of code to end up with a more classical approach where the transformation would be called by new_pColl = pColl | "Process images" >> beam.ParDo(ProcessImages(), *args)
like the other steps of my pipeline.
I tried several times but always end up with an error and don't know how I could refactor things properly. Here is the definition I wrote for the class as I would like it to be used :
import datetime
import json
import time
from ssl import SSLError
import apache_beam as beam
from apache_beam.transforms.ptransform import PTransform
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient import discovery
from googleapiclient.errors import HttpError
from dataflow_helpers import util
from operations.features import (base, face, image, label, logo, objects, safe_search,
text)
from operations.features.flatten import Flatten
from operations.vision.image import ExtractImageMetadata
from operations.performance.performance_collect_factory import PerformanceCollectFactory
class ProcessImages(beam.DoFn):
def process_element(self, element, creative_source_type):
self.operations_map = {
"image_properties_annotation": image,
"text_annotations": text,
"safe_search_annotation": safe_search,
"label_annotations": label,
"logo_annotations": logo,
"face_annotations": face,
"object_annotations": objects,
}
self.creative_source_type = creative_source_type
image_endpoints = [k for k in self.operations_map.keys()]
annotated_creatives = (
element | "Annotate image creatives" >> beam.ParDo(
ExtractImageMetadata())
| "Extract Top Level Features" >> beam.ParDo(
base.Extract())
)
features = {}
features["base"] = annotated_creatives | "Collect base features" >> beam.ParDo(
base.Collect()
)
for endpoint in image_endpoints:
endpoint_features = (annotated_creatives
| f"Filter {endpoint}" >> beam.ParDo(
util.FilterAPIOutput(), endpoint=endpoint
)
| f"Extract {endpoint} features" >> beam.ParDo(
self.operations_map[endpoint].Extract()
)
| f"Collect {endpoint} by key" >> beam.ParDo(
self.operations_map[endpoint].Collect())
)
features[endpoint] = endpoint_features
features["performance"] = element | "Extract performance" >> beam.ParDo(PerformanceCollectFactory().CreatePerformanceCollect,
self.creative_source_type)
return (
features | "Group features" >> beam.CoGroupByKey()
| "Flatten features" >> beam.ParDo(Flatten())
)
And the error I get when running the pipeline:
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/transforms/core.py in process(self, element, *args, **kwargs)
637 """
--> 638 raise NotImplementedError
639
NotImplementedError:
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
<ipython-input-13-0f9bbda0a56f> in <module>
----> 1 ib.show(process_images)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs)
226 def run_within_progress_indicator(*args, **kwargs):
227 with ProgressIndicator('Processing...', 'Done.'):
--> 228 return func(*args, **kwargs)
229
230 return run_within_progress_indicator
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py in show(*pcolls, **configs)
484 recording_manager = ie.current_env().get_recording_manager(
485 user_pipeline, create_if_absent=True)
--> 486 recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
487
488 # Catch a KeyboardInterrupt to gracefully cancel the recording and
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration)
441 category=DeprecationWarning)
442 pf.PipelineFragment(list(uncomputed_pcolls),
--> 443 self.user_pipeline.options).run()
444 result = ie.current_env().pipeline_result(self.user_pipeline)
445 else:
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py in run(self, display_pipeline_graph, use_cache, blocking)
114 self._runner_pipeline.runner._force_compute = not use_cache
115 self._runner_pipeline.runner._blocking = blocking
--> 116 return self.deduce_fragment().run()
117 finally:
118 self._runner_pipeline.runner._skip_display = preserved_skip_display
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
557 finally:
558 shutil.rmtree(tmpdir)
--> 559 return self.runner.run_pipeline(self, self._options)
560 finally:
561 shutil.rmtree(self.local_tempdir, ignore_errors=True)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
196
197 main_job_result = PipelineResult(
--> 198 pipeline_to_execute.run(), pipeline_instrument)
199 # In addition to this pipeline result setting, redundant result setting from
200 # outer scopes are also recommended since the user_pipeline might not be
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
557 finally:
558 shutil.rmtree(tmpdir)
--> 559 return self.runner.run_pipeline(self, self._options)
560 finally:
561 shutil.rmtree(self.local_tempdir, ignore_errors=True)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
131 runner = BundleBasedDirectRunner()
132
--> 133 return runner.run_pipeline(pipeline, options)
134
135
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
181
182 self._latest_run_result = self.run_via_runner_api(
--> 183 pipeline.to_runner_api(default_environment=self._default_environment))
184 return self._latest_run_result
185
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
191 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
192 # the teststream (if any), and all the stages).
--> 193 return self.run_stages(stage_context, stages)
194
195 @contextlib.contextmanager
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
357 stage_results = self._run_stage(
358 runner_execution_context,
--> 359 bundle_context_manager,
360 )
361 monitoring_infos_by_stage[stage.name] = (
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
553 input_timers,
554 expected_timer_output,
--> 555 bundle_manager)
556
557 final_result = merge_results(last_result)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
593
594 result, splits = bundle_manager.process_bundle(
--> 595 data_input, data_output, input_timers, expected_timer_output)
596 # Now we collect all the deferred inputs remaining from bundle execution.
597 # Deferred inputs can be:
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
894 process_bundle_descriptor.id,
895 cache_tokens=[next(self._cache_token_generator)]))
--> 896 result_future = self._worker_handler.control_conn.push(process_bundle_req)
897
898 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
378 self._uid_counter += 1
379 request.instruction_id = 'control_%s' % self._uid_counter
--> 380 response = self.worker.do_instruction(request)
381 return ControlFuture(request.instruction_id, response)
382
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
605 # E.g. if register is set, this will call self.register(request.register))
606 return getattr(self, request_type)(
--> 607 getattr(request, request_type), request.instruction_id)
608 else:
609 raise NotImplementedError
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
642 with self.maybe_profile(instruction_id):
643 delayed_applications, requests_finalization = (
--> 644 bundle_processor.process_bundle(instruction_id))
645 monitoring_infos = bundle_processor.monitoring_infos()
646 monitoring_infos.extend(self.state_cache_metrics_fn())
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
998 elif isinstance(element, beam_fn_api_pb2.Elements.Data):
999 input_op_by_transform_id[element.transform_id].process_encoded(
-> 1000 element.data)
1001
1002 # Finish all operations.
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
226 decoded_value = self.windowed_coder_impl.decode_from_stream(
227 input_stream, True)
--> 228 self.output(decoded_value)
229
230 def monitoring_infos(self, transform_id, tag_to_pcollection_id):
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SdfProcessSizedElements.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SdfProcessSizedElements.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.ConsumerSet.receive()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/apache-beam-2.28.0/lib/python3.7/site-packages/future/utils/__init__.py in raise_with_traceback(exc, traceback)
444 if traceback == Ellipsis:
445 _, _, traceback = sys.exc_info()
--> 446 raise exc.with_traceback(traceback)
447
448 else:
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/transforms/core.py in process(self, element, *args, **kwargs)
636 An Iterable of output elements or None.
637 """
--> 638 raise NotImplementedError
639
640 def setup(self):
RuntimeError: NotImplementedError [while running '[10]: Process images']
I cannot figure out how to implement that transform properly. Where am I configuring my transform bad ? Am I passing an argument that cannot be considered as a pCollection
to my ProcessImages
transform ?
Upvotes: 2
Views: 1245
Reputation: 31
I think there are two issues here. First, if you subclass DoFn
, you need to implement a process
method, as opposed to the process_element
method you have now. Per the documentation:
Method to use for processing elements.
This is invoked byDoFnRunner
for each element of a inputPCollection
.
So the runner is trying to invoke a method that you haven't implemented, which explains the error you got.
Maybe the bigger issue is that, as noted above, ParDo
invokes the DoFn
with each element of the input Pcollection
as an argument, not the PCollection
itself. The logic you're performing works on the PCollection
as a whole, so you can't use a DoFn
to do that. For example:
annotated_creatives = (
element | "Annotate image creatives" >> beam.ParDo(
ExtractImageMetadata())
| "Extract Top Level Features" >> beam.ParDo(
base.Extract())
)
element
here is not a PCollection
, so it does not have an |
(or __ror__
) method. The thing you're trying to implement is a reusable section of the pipeline, which is exactly what a PTransform
is used for, from your original implementation. I would recommend reading the "Composite Transform" section of the Beam user guide which talks about these concepts.
Upvotes: 3