Parker Heindl
Parker Heindl

Reputation: 135

Why is FlatMap after GroupByKey in Apache Beam python so slow?

Given a relatively small data source (3,000-10,000) of key/value pairs, I am trying to only process records which meet a group threshold (50-100). So the simplest method is to group them by key, filter and unwind - either with FlatMap or a ParDo. The largest group has only 1,500 records so far. But this seems to be a severe bottleneck in production on Google Cloud Dataflow.

With given list

(1, 1) (1, 2) (1, 3) ... (2, 1) (2, 2) (2, 3) ...

run through a set of transforms to filter and group by key:

p | 'Group' >> beam.GroupByKey()
  | 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50)
  | 'Unwind' >> beam.FlatMap(lambda (key, values): values)

Any ideas on how to make this more performant? Thanks for your help!

Upvotes: 4

Views: 3705

Answers (1)

Pablo
Pablo

Reputation: 11021

This is an interesting corner case for a pipeline. I believe that your issue here is on the way you read the data that comes from GroupByKey. Let me give you a quick summary of how GBK works.

What's GroupByKey, and how big data systems implement it

All big data systems implement ways to realize operations over multiple elements of the same key. This was called reduce in MapReduce, and in other big data systems is called Group By Key, or Combine.

When you do a GroupByKey transform, Dataflow needs to gather all the elements for a single key into the same machine. Since different elements for the same key may be processed in different machines, data needs to be serialized somehow.

This means that when you read data that comes from a GroupByKey, you are accessing the IO of the workers (i.e. not from memory), so you really want to avoid reading shuffle data too many times.

How this translates to your pipeline

I believe that your problem here is that Filter and Unwind will both read data from shuffle separately (so you will read the data for each list twice). What you want to do is to read your shuffle data only once. You can do this with a single FlatMap that both filters and unwinds your data without double-reading from shuffle. Something like this:

def unwind_and_filter((key, values)):
  # This consumes all the data from shuffle
  value_list = list(values)
  if len(value_list) > 50:
    yield value_list

p | 'Group' >> beam.GroupByKey()
  | 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter)

Let me know if this helps.

Upvotes: 4

Related Questions