Reputation: 135
Given a relatively small data source (3,000-10,000) of key/value pairs, I am trying to only process records which meet a group threshold (50-100). So the simplest method is to group them by key, filter and unwind - either with FlatMap or a ParDo. The largest group has only 1,500 records so far. But this seems to be a severe bottleneck in production on Google Cloud Dataflow.
With given list
(1, 1) (1, 2) (1, 3) ... (2, 1) (2, 2) (2, 3) ...
run through a set of transforms to filter and group by key:
p | 'Group' >> beam.GroupByKey()
| 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50)
| 'Unwind' >> beam.FlatMap(lambda (key, values): values)
Any ideas on how to make this more performant? Thanks for your help!
Upvotes: 4
Views: 3705
Reputation: 11021
This is an interesting corner case for a pipeline. I believe that your issue here is on the way you read the data that comes from GroupByKey
. Let me give you a quick summary of how GBK works.
GroupByKey
, and how big data systems implement itAll big data systems implement ways to realize operations over multiple elements of the same key. This was called reduce in MapReduce, and in other big data systems is called Group By Key, or Combine.
When you do a GroupByKey
transform, Dataflow needs to gather all the elements for a single key into the same machine. Since different elements for the same key may be processed in different machines, data needs to be serialized somehow.
This means that when you read data that comes from a GroupByKey
, you are accessing the IO of the workers (i.e. not from memory), so you really want to avoid reading shuffle data too many times.
I believe that your problem here is that Filter
and Unwind
will both read data from shuffle separately (so you will read the data for each list twice). What you want to do is to read your shuffle data only once. You can do this with a single FlatMap
that both filters and unwinds your data without double-reading from shuffle. Something like this:
def unwind_and_filter((key, values)):
# This consumes all the data from shuffle
value_list = list(values)
if len(value_list) > 50:
yield value_list
p | 'Group' >> beam.GroupByKey()
| 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter)
Let me know if this helps.
Upvotes: 4