Thijs
Thijs

Reputation: 1546

Efficient ParDo setup or start_bundle for side input

list A: 25M hashes
list B: 175K hashes

I want to check each hash in list B for existence in list A. For this I have a ParDo function and I yield when it's not matched. This is a deduplication process.

How do I set up this ParDo efficiently, now I do a side input of list A while processing list B. But shouldnt the side input go to setup() or start_bundle() of the ParDo so I store the lookup list (A) in the worker just once?

class Checknewrecords(beam.DoFn):
    def process(self, element, hashlist):
        if element['TA_HASH'] not in hashlist:
            yield element
        else:
            pass

If you have a the answer please include a link to the documentation because I did not find any good documentation for the Python version.

Upvotes: 3

Views: 2357

Answers (2)

Alex Amato
Alex Amato

Reputation: 1725

Sorry, I initially misunderstood the question. Actually I don't think its possible to have a side input in start_bundle. It is only accessible in process_bundle. But you could instead do the work on the first call to process bundle and get a similar result.

class DoFnMethods(beam.DoFn):
  def __init__(self):
    self.first_element_processed = False
    self.once_retrieved_side_input_data = None

  def called_once(self, side_input):
    if self.first_element_processed:
      return
    self.once_retrieved_side_input_data = side_input.get(...)
    self.first_element_processed = True

  def process(self, element, side_input):
    self.called_once(side_input)
    ...

Note: You do need to be aware of the fact that start bundle and finish bundle will be called once for the bundle across all windows, and the side input is provided to process is different for each window computed. So if you are working with windows you may need to use a dict(keyed by window) for the self.first_element_processed and self.once_retrieved_side_input_data variables so you can called_onc once for each window.

Upvotes: 2

Alex Amato
Alex Amato

Reputation: 1725

I believe that pvalue.AsDict is what you need, which will give you a dictionary style interface for the side input. You can find some examples on the Apache Beam Github search.

Here is a simplified example I just wrote, but please see the checked in example below (though a bit more complicated), incase I made a mistake.

class ComputeHashes(beam.DoFn):
  def process(self, element):
      # use the element as a key to produce a KV, value is not used
      yield (HashFunction(element), true) 

initial_elements = beam.Create("foo")
computed_hashes = initial_elements | beam.ParDo(ComputeHashes())

class FilterIfAlreadyComputedHash(beam.DoFn):
  def process(self, element, hashes):
    # Filter if it already exists in hashes
    if not hashes.get(element):
      yield element

more_elements = beam.Create("foo", "bar") # Read from your pipeline's source
small_words = more_elements | beam.ParDo(FilterIfAlreadyComputedHash(), beam.pvalue.AsDict(computed_hashes))

In the checked in example, from the beam github repo, in visionml_test.py a PCollection is converted to the Dictionary type view using beam.PValue.AsDict().

class VisionMlTestIT(unittest.TestCase):
  def test_text_detection_with_language_hint(self):
    IMAGES_TO_ANNOTATE = [
        'gs://apache-beam-samples/advanced_analytics/vision/sign.jpg'
    ]
    IMAGE_CONTEXT = [vision.types.ImageContext(language_hints=['en'])]

    with TestPipeline(is_integration_test=True) as p:
      contexts = p | 'Create context' >> beam.Create(
          dict(zip(IMAGES_TO_ANNOTATE, IMAGE_CONTEXT)))

      output = (
          p
          | beam.Create(IMAGES_TO_ANNOTATE)
          | AnnotateImage(
              features=[vision.types.Feature(type='TEXT_DETECTION')],
              context_side_input=beam.pvalue.AsDict(contexts))
          | beam.ParDo(extract))

The side input is passed into a FlatMap (in visionml.py), and, in the FlatMap's function, an entry is retrieved from the dictionary with .get(). This could also be passed into a Map or ParDo. See: beam python side input documentation (here they use .AsSingleton instead .AsDict). You can find an example here of using it in the process call.

class AnnotateImage(PTransform):
  """A ``PTransform`` for annotating images using the GCP Vision API.
  ref: https://cloud.google.com/vision/docs/
  Batches elements together using ``util.BatchElements`` PTransform and sends
  each batch of elements to the GCP Vision API.
  Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI)
  or binary_type base64-encoded image data.
  Accepts an `AsDict` side input that maps each image to an image context.
  """

  MAX_BATCH_SIZE = 5
  MIN_BATCH_SIZE = 1

  def __init__(
      self,
      features,
      retry=None,
      timeout=120,
      max_batch_size=None,
      min_batch_size=None,
      client_options=None,
      context_side_input=None,
      metadata=None):
    """
    Args:
      features: (List[``vision.types.Feature.enums.Feature``]) Required.
        The Vision API features to detect
      retry: (google.api_core.retry.Retry) Optional.
        A retry object used to retry requests.
        If None is specified (default), requests will not be retried.
      timeout: (float) Optional.
        The time in seconds to wait for the response from the Vision API.
        Default is 120.
      max_batch_size: (int) Optional.
        Maximum number of images to batch in the same request to the Vision API.
        Default is 5 (which is also the Vision API max).
        This parameter is primarily intended for testing.
      min_batch_size: (int) Optional.
        Minimum number of images to batch in the same request to the Vision API.
        Default is None. This parameter is primarily intended for testing.
      client_options:
        (Union[dict, google.api_core.client_options.ClientOptions]) Optional.
        Client options used to set user options on the client.
        API Endpoint should be set through client_options.
      context_side_input: (beam.pvalue.AsDict) Optional.
        An ``AsDict`` of a PCollection to be passed to the
        _ImageAnnotateFn as the image context mapping containing additional
        image context and/or feature-specific parameters.
        Example usage::
          image_contexts =
            [(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
            ``vision.types.ImageContext()``]),
            (''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
            ``vision.types.ImageContext()``]),]
          context_side_input =
            (
              p
              | "Image contexts" >> beam.Create(image_contexts)
            )
          visionml.AnnotateImage(features,
            context_side_input=beam.pvalue.AsDict(context_side_input)))
      metadata: (Optional[Sequence[Tuple[str, str]]]): Optional.
        Additional metadata that is provided to the method.
    """
    super(AnnotateImage, self).__init__()
    self.features = features
    self.retry = retry
    self.timeout = timeout
    self.max_batch_size = max_batch_size or AnnotateImage.MAX_BATCH_SIZE
    if self.max_batch_size > AnnotateImage.MAX_BATCH_SIZE:
      raise ValueError(
          'Max batch_size exceeded. '
          'Batch size needs to be smaller than {}'.format(
              AnnotateImage.MAX_BATCH_SIZE))
    self.min_batch_size = min_batch_size or AnnotateImage.MIN_BATCH_SIZE
    self.client_options = client_options
    self.context_side_input = context_side_input
    self.metadata = metadata

  def expand(self, pvalue):
    return (
        pvalue
        | FlatMap(self._create_image_annotation_pairs, self.context_side_input)
        | util.BatchElements(
            min_batch_size=self.min_batch_size,
            max_batch_size=self.max_batch_size)
        | ParDo(
            _ImageAnnotateFn(
                features=self.features,
                retry=self.retry,
                timeout=self.timeout,
                client_options=self.client_options,
                metadata=self.metadata)))

  @typehints.with_input_types(
      Union[text_type, binary_type], Optional[vision.types.ImageContext])
  @typehints.with_output_types(List[vision.types.AnnotateImageRequest])
  def _create_image_annotation_pairs(self, element, context_side_input):
    if context_side_input:  # If we have a side input image context, use that
      image_context = context_side_input.get(element)
    else:
      image_context = None

    if isinstance(element, text_type):
      image = vision.types.Image(
          source=vision.types.ImageSource(image_uri=element))
    else:  # Typehint checks only allows text_type or binary_type
      image = vision.types.Image(content=element)

    request = vision.types.AnnotateImageRequest(
        image=image, features=self.features, image_context=image_context)
    yield request

Note, in Java you use it as .asMap().

Upvotes: 2

Related Questions