antounes
antounes

Reputation: 83

Defining beam.ParDo stage of a pipeline

I'm working on an Apache Beam pipeline using an AI Platform Jupyter Notebook environment on Google Cloud Platform (Apache Beam 2.28.0 for Python 3). Walking through the code I had to puts hands on, I came to a transformation applied this way : new_pColl = pColl | "Process images" >> ProcessImages(*args) where ProcessImages is a class defined by:

import datetime
import json
import time
from ssl import SSLError

import apache_beam as beam
from apache_beam.transforms.ptransform import PTransform
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient import discovery
from googleapiclient.errors import HttpError

from dataflow_helpers import util
from operations.features import (base, face, image, label, logo, objects, safe_search,
                                 text)
from operations.features.flatten import Flatten
from operations.vision.image import ExtractImageMetadata
from operations.performance.performance_collect_factory import PerformanceCollectFactory

class ProcessImages(PTransform):
def __init__(self, creative_source_type):
    self.operations_map = {
        'image_properties_annotation': image,
        'text_annotations': text,
        'safe_search_annotation': safe_search,
        'label_annotations': label,
        'logo_annotations': logo,
        'face_annotations': face,
        'object_annotations': objects,
    }
    self.creative_source_type = creative_source_type

def expand(self, pColl):
    image_endpoints = ['image_properties_annotation', 'text_annotations',
                       'safe_search_annotation', 'label_annotations',
                       'logo_annotations', 'face_annotations', 'object_annotations']

    annotated_creatives = (
        pColl | 'Annotate image creatives' >> beam.ParDo(
            ExtractImageMetadata())
        | 'Extract Top Level Features' >> beam.ParDo(
            base.Extract()))

    features = {}

    features['base'] = annotated_creatives | 'collect base features' >> beam.ParDo(
        base.Collect())

    for endpoint in image_endpoints:
        endpoint_features = (annotated_creatives
                             | f'Filter {endpoint}' >> beam.ParDo(
                                 util.FilterAPIOutput(), endpoint=endpoint)
                             | f"Extract {endpoint} features" >> beam.ParDo(
                                 self.operations_map[endpoint].Extract())
                             | f"Collect {endpoint} by key" >> beam.ParDo(
                                 self.operations_map[endpoint].Collect()))
        features[endpoint] = endpoint_features

    performance_DoFn = PerformanceCollectFactory(
    ).CreatePerformanceCollect
    features['performance'] = pColl | 'extract performance' >> beam.ParDo(performance_DoFn,
                                                                          self.creative_source_type)
    
    return (features | 'Group features' >> beam.CoGroupByKey()
            | 'Flatten features' >> beam.ParDo(Flatten()))

My point is that I'd like to refactor that piece of code to end up with a more classical approach where the transformation would be called by new_pColl = pColl | "Process images" >> beam.ParDo(ProcessImages(), *args) like the other steps of my pipeline.

I tried several times but always end up with an error and don't know how I could refactor things properly. Here is the definition I wrote for the class as I would like it to be used :

import datetime
import json
import time
from ssl import SSLError

import apache_beam as beam
from apache_beam.transforms.ptransform import PTransform
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient import discovery
from googleapiclient.errors import HttpError

from dataflow_helpers import util
from operations.features import (base, face, image, label, logo, objects, safe_search,
                                 text)
from operations.features.flatten import Flatten
from operations.vision.image import ExtractImageMetadata
from operations.performance.performance_collect_factory import PerformanceCollectFactory

class ProcessImages(beam.DoFn):
    def process_element(self, element, creative_source_type):
        self.operations_map = {
            "image_properties_annotation": image,
            "text_annotations": text,
            "safe_search_annotation": safe_search,
            "label_annotations": label,
            "logo_annotations": logo,
            "face_annotations": face,
            "object_annotations": objects,
        }
        self.creative_source_type = creative_source_type
        
        image_endpoints = [k for k in self.operations_map.keys()]
        
        annotated_creatives = (
            element | "Annotate image creatives" >> beam.ParDo(
                ExtractImageMetadata())
            | "Extract Top Level Features" >> beam.ParDo(
                base.Extract())
        )
        
        features = {}
        
        features["base"] = annotated_creatives | "Collect base features" >> beam.ParDo(
            base.Collect()
        )
        
        for endpoint in image_endpoints:
            endpoint_features = (annotated_creatives
                                 | f"Filter {endpoint}" >> beam.ParDo(
                                     util.FilterAPIOutput(), endpoint=endpoint
                                 )
                                 | f"Extract {endpoint} features" >> beam.ParDo(
                                     self.operations_map[endpoint].Extract()
                                 )
                                 | f"Collect {endpoint} by key" >> beam.ParDo(
                                     self.operations_map[endpoint].Collect())
                                )
            features[endpoint] = endpoint_features
            
        features["performance"] = element | "Extract performance" >> beam.ParDo(PerformanceCollectFactory().CreatePerformanceCollect,
self.creative_source_type)
        return (
            features | "Group features" >> beam.CoGroupByKey()
            | "Flatten features" >> beam.ParDo(Flatten())
        )

And the error I get when running the pipeline:

    ---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/transforms/core.py in process(self, element, *args, **kwargs)
    637     """
--> 638     raise NotImplementedError
    639 

NotImplementedError: 

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-13-0f9bbda0a56f> in <module>
----> 1 ib.show(process_images)

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs)
    226   def run_within_progress_indicator(*args, **kwargs):
    227     with ProgressIndicator('Processing...', 'Done.'):
--> 228       return func(*args, **kwargs)
    229 
    230   return run_within_progress_indicator

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py in show(*pcolls, **configs)
    484   recording_manager = ie.current_env().get_recording_manager(
    485       user_pipeline, create_if_absent=True)
--> 486   recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
    487 
    488   # Catch a KeyboardInterrupt to gracefully cancel the recording and

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration)
    441           category=DeprecationWarning)
    442       pf.PipelineFragment(list(uncomputed_pcolls),
--> 443                           self.user_pipeline.options).run()
    444       result = ie.current_env().pipeline_result(self.user_pipeline)
    445     else:

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py in run(self, display_pipeline_graph, use_cache, blocking)
    114       self._runner_pipeline.runner._force_compute = not use_cache
    115       self._runner_pipeline.runner._blocking = blocking
--> 116       return self.deduce_fragment().run()
    117     finally:
    118       self._runner_pipeline.runner._skip_display = preserved_skip_display

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    557         finally:
    558           shutil.rmtree(tmpdir)
--> 559       return self.runner.run_pipeline(self, self._options)
    560     finally:
    561       shutil.rmtree(self.local_tempdir, ignore_errors=True)

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
    196 
    197     main_job_result = PipelineResult(
--> 198         pipeline_to_execute.run(), pipeline_instrument)
    199     # In addition to this pipeline result setting, redundant result setting from
    200     # outer scopes are also recommended since the user_pipeline might not be

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    557         finally:
    558           shutil.rmtree(tmpdir)
--> 559       return self.runner.run_pipeline(self, self._options)
    560     finally:
    561       shutil.rmtree(self.local_tempdir, ignore_errors=True)

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    131       runner = BundleBasedDirectRunner()
    132 
--> 133     return runner.run_pipeline(pipeline, options)
    134 
    135 

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
    181 
    182     self._latest_run_result = self.run_via_runner_api(
--> 183         pipeline.to_runner_api(default_environment=self._default_environment))
    184     return self._latest_run_result
    185 

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
    191     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    192     #   the teststream (if any), and all the stages).
--> 193     return self.run_stages(stage_context, stages)
    194 
    195   @contextlib.contextmanager

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
    357           stage_results = self._run_stage(
    358               runner_execution_context,
--> 359               bundle_context_manager,
    360           )
    361           monitoring_infos_by_stage[stage.name] = (

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
    553               input_timers,
    554               expected_timer_output,
--> 555               bundle_manager)
    556 
    557       final_result = merge_results(last_result)

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    593 
    594     result, splits = bundle_manager.process_bundle(
--> 595         data_input, data_output, input_timers, expected_timer_output)
    596     # Now we collect all the deferred inputs remaining from bundle execution.
    597     # Deferred inputs can be:

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
    894             process_bundle_descriptor.id,
    895             cache_tokens=[next(self._cache_token_generator)]))
--> 896     result_future = self._worker_handler.control_conn.push(process_bundle_req)
    897 
    898     split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
    378       self._uid_counter += 1
    379       request.instruction_id = 'control_%s' % self._uid_counter
--> 380     response = self.worker.do_instruction(request)
    381     return ControlFuture(request.instruction_id, response)
    382 

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
    605       # E.g. if register is set, this will call self.register(request.register))
    606       return getattr(self, request_type)(
--> 607           getattr(request, request_type), request.instruction_id)
    608     else:
    609       raise NotImplementedError

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
    642         with self.maybe_profile(instruction_id):
    643           delayed_applications, requests_finalization = (
--> 644               bundle_processor.process_bundle(instruction_id))
    645           monitoring_infos = bundle_processor.monitoring_infos()
    646           monitoring_infos.extend(self.state_cache_metrics_fn())

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
    998           elif isinstance(element, beam_fn_api_pb2.Elements.Data):
    999             input_op_by_transform_id[element.transform_id].process_encoded(
-> 1000                 element.data)
   1001 
   1002       # Finish all operations.

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
    226       decoded_value = self.windowed_coder_impl.decode_from_stream(
    227           input_stream, True)
--> 228       self.output(decoded_value)
    229 
    230   def monitoring_infos(self, transform_id, tag_to_pcollection_id):

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SdfProcessSizedElements.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SdfProcessSizedElements.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.ConsumerSet.receive()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

~/apache-beam-2.28.0/lib/python3.7/site-packages/future/utils/__init__.py in raise_with_traceback(exc, traceback)
    444         if traceback == Ellipsis:
    445             _, _, traceback = sys.exc_info()
--> 446         raise exc.with_traceback(traceback)
    447 
    448 else:

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/transforms/core.py in process(self, element, *args, **kwargs)
    636       An Iterable of output elements or None.
    637     """
--> 638     raise NotImplementedError
    639 
    640   def setup(self):

RuntimeError: NotImplementedError [while running '[10]: Process images']

I cannot figure out how to implement that transform properly. Where am I configuring my transform bad ? Am I passing an argument that cannot be considered as a pCollection to my ProcessImages transform ?

Upvotes: 2

Views: 1245

Answers (1)

jbrr
jbrr

Reputation: 31

I think there are two issues here. First, if you subclass DoFn, you need to implement a process method, as opposed to the process_element method you have now. Per the documentation:

Method to use for processing elements.
This is invoked by DoFnRunner for each element of a input PCollection.

So the runner is trying to invoke a method that you haven't implemented, which explains the error you got.

Maybe the bigger issue is that, as noted above, ParDo invokes the DoFn with each element of the input Pcollection as an argument, not the PCollection itself. The logic you're performing works on the PCollection as a whole, so you can't use a DoFn to do that. For example:

annotated_creatives = (
            element | "Annotate image creatives" >> beam.ParDo(
                ExtractImageMetadata())
            | "Extract Top Level Features" >> beam.ParDo(
                base.Extract())
        )

element here is not a PCollection, so it does not have an | (or __ror__) method. The thing you're trying to implement is a reusable section of the pipeline, which is exactly what a PTransform is used for, from your original implementation. I would recommend reading the "Composite Transform" section of the Beam user guide which talks about these concepts.

Upvotes: 3

Related Questions