Reputation: 1546
list A: 25M hashes
list B: 175K hashes
I want to check each hash in list B for existence in list A. For this I have a ParDo function and I yield when it's not matched. This is a deduplication process.
How do I set up this ParDo efficiently, now I do a side input of list A while processing list B. But shouldnt the side input go to setup() or start_bundle() of the ParDo so I store the lookup list (A) in the worker just once?
class Checknewrecords(beam.DoFn):
def process(self, element, hashlist):
if element['TA_HASH'] not in hashlist:
yield element
else:
pass
If you have a the answer please include a link to the documentation because I did not find any good documentation for the Python version.
current_data is a PCollection from a BigQuery.read
new_records = transformed_records | 'Checknewrecords' >> beam.ParDo(Checknewrecords(), pvalue.AsList(current_data))
Upvotes: 3
Views: 2357
Reputation: 1725
Sorry, I initially misunderstood the question. Actually I don't think its possible to have a side input in start_bundle. It is only accessible in process_bundle. But you could instead do the work on the first call to process bundle and get a similar result.
class DoFnMethods(beam.DoFn):
def __init__(self):
self.first_element_processed = False
self.once_retrieved_side_input_data = None
def called_once(self, side_input):
if self.first_element_processed:
return
self.once_retrieved_side_input_data = side_input.get(...)
self.first_element_processed = True
def process(self, element, side_input):
self.called_once(side_input)
...
Note: You do need to be aware of the fact that start bundle and finish bundle will be called once for the bundle across all windows, and the side input is provided to process is different for each window computed. So if you are working with windows you may need to use a dict(keyed by window) for the self.first_element_processed and self.once_retrieved_side_input_data variables so you can called_onc once for each window.
Upvotes: 2
Reputation: 1725
I believe that pvalue.AsDict is what you need, which will give you a dictionary style interface for the side input. You can find some examples on the Apache Beam Github search.
Here is a simplified example I just wrote, but please see the checked in example below (though a bit more complicated), incase I made a mistake.
class ComputeHashes(beam.DoFn):
def process(self, element):
# use the element as a key to produce a KV, value is not used
yield (HashFunction(element), true)
initial_elements = beam.Create("foo")
computed_hashes = initial_elements | beam.ParDo(ComputeHashes())
class FilterIfAlreadyComputedHash(beam.DoFn):
def process(self, element, hashes):
# Filter if it already exists in hashes
if not hashes.get(element):
yield element
more_elements = beam.Create("foo", "bar") # Read from your pipeline's source
small_words = more_elements | beam.ParDo(FilterIfAlreadyComputedHash(), beam.pvalue.AsDict(computed_hashes))
In the checked in example, from the beam github repo, in visionml_test.py a PCollection is converted to the Dictionary type view using beam.PValue.AsDict().
class VisionMlTestIT(unittest.TestCase):
def test_text_detection_with_language_hint(self):
IMAGES_TO_ANNOTATE = [
'gs://apache-beam-samples/advanced_analytics/vision/sign.jpg'
]
IMAGE_CONTEXT = [vision.types.ImageContext(language_hints=['en'])]
with TestPipeline(is_integration_test=True) as p:
contexts = p | 'Create context' >> beam.Create(
dict(zip(IMAGES_TO_ANNOTATE, IMAGE_CONTEXT)))
output = (
p
| beam.Create(IMAGES_TO_ANNOTATE)
| AnnotateImage(
features=[vision.types.Feature(type='TEXT_DETECTION')],
context_side_input=beam.pvalue.AsDict(contexts))
| beam.ParDo(extract))
The side input is passed into a FlatMap (in visionml.py), and, in the FlatMap's function, an entry is retrieved from the dictionary with .get(). This could also be passed into a Map or ParDo. See: beam python side input documentation (here they use .AsSingleton instead .AsDict). You can find an example here of using it in the process call.
class AnnotateImage(PTransform):
"""A ``PTransform`` for annotating images using the GCP Vision API.
ref: https://cloud.google.com/vision/docs/
Batches elements together using ``util.BatchElements`` PTransform and sends
each batch of elements to the GCP Vision API.
Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI)
or binary_type base64-encoded image data.
Accepts an `AsDict` side input that maps each image to an image context.
"""
MAX_BATCH_SIZE = 5
MIN_BATCH_SIZE = 1
def __init__(
self,
features,
retry=None,
timeout=120,
max_batch_size=None,
min_batch_size=None,
client_options=None,
context_side_input=None,
metadata=None):
"""
Args:
features: (List[``vision.types.Feature.enums.Feature``]) Required.
The Vision API features to detect
retry: (google.api_core.retry.Retry) Optional.
A retry object used to retry requests.
If None is specified (default), requests will not be retried.
timeout: (float) Optional.
The time in seconds to wait for the response from the Vision API.
Default is 120.
max_batch_size: (int) Optional.
Maximum number of images to batch in the same request to the Vision API.
Default is 5 (which is also the Vision API max).
This parameter is primarily intended for testing.
min_batch_size: (int) Optional.
Minimum number of images to batch in the same request to the Vision API.
Default is None. This parameter is primarily intended for testing.
client_options:
(Union[dict, google.api_core.client_options.ClientOptions]) Optional.
Client options used to set user options on the client.
API Endpoint should be set through client_options.
context_side_input: (beam.pvalue.AsDict) Optional.
An ``AsDict`` of a PCollection to be passed to the
_ImageAnnotateFn as the image context mapping containing additional
image context and/or feature-specific parameters.
Example usage::
image_contexts =
[(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
``vision.types.ImageContext()``]),
(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
``vision.types.ImageContext()``]),]
context_side_input =
(
p
| "Image contexts" >> beam.Create(image_contexts)
)
visionml.AnnotateImage(features,
context_side_input=beam.pvalue.AsDict(context_side_input)))
metadata: (Optional[Sequence[Tuple[str, str]]]): Optional.
Additional metadata that is provided to the method.
"""
super(AnnotateImage, self).__init__()
self.features = features
self.retry = retry
self.timeout = timeout
self.max_batch_size = max_batch_size or AnnotateImage.MAX_BATCH_SIZE
if self.max_batch_size > AnnotateImage.MAX_BATCH_SIZE:
raise ValueError(
'Max batch_size exceeded. '
'Batch size needs to be smaller than {}'.format(
AnnotateImage.MAX_BATCH_SIZE))
self.min_batch_size = min_batch_size or AnnotateImage.MIN_BATCH_SIZE
self.client_options = client_options
self.context_side_input = context_side_input
self.metadata = metadata
def expand(self, pvalue):
return (
pvalue
| FlatMap(self._create_image_annotation_pairs, self.context_side_input)
| util.BatchElements(
min_batch_size=self.min_batch_size,
max_batch_size=self.max_batch_size)
| ParDo(
_ImageAnnotateFn(
features=self.features,
retry=self.retry,
timeout=self.timeout,
client_options=self.client_options,
metadata=self.metadata)))
@typehints.with_input_types(
Union[text_type, binary_type], Optional[vision.types.ImageContext])
@typehints.with_output_types(List[vision.types.AnnotateImageRequest])
def _create_image_annotation_pairs(self, element, context_side_input):
if context_side_input: # If we have a side input image context, use that
image_context = context_side_input.get(element)
else:
image_context = None
if isinstance(element, text_type):
image = vision.types.Image(
source=vision.types.ImageSource(image_uri=element))
else: # Typehint checks only allows text_type or binary_type
image = vision.types.Image(content=element)
request = vision.types.AnnotateImageRequest(
image=image, features=self.features, image_context=image_context)
yield request
Note, in Java you use it as .asMap().
Upvotes: 2