scordata
scordata

Reputation: 149

SideInputs kill dataflow performance

I'm using dataflow to generate a large amount of data.

I've tested two versions of my pipeline: one with a side input (of varying sizes), and other other without.

When I run the pipeline without the side input, my job will finish in about 7 minutes. When I run my job with the side input, my job will never finish.

Here's what my DoFn looks like:

public class MyDoFn extends DoFn<String, String> {

    final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pCollectionView;
    final List<CSVRecord> stuff;

    private Aggregator<Integer, Integer> dofnCounter =
            createAggregator("DoFn Counter", new Sum.SumIntegerFn());

    public MyDoFn(PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv, List<CSVRecord> m) {
        this.pCollectionView = pcv;
        this.stuff = m;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<TreeMap<Long, Float>>> pdata = processContext.sideInput(pCollectionView);

        processContext.output(AnotherClass.generateData(stuff, pdata));

        dofnCounter.addValue(1);
    }
}

And here's what my pipeline looks like:

final Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<KV<String, TreeMap<Long, Float>>> data;
data = p.apply(TextIO.Read.from("gs://where_the_files_are/*").named("Reading Data"))
        .apply(ParDo.named("Parsing data").of(new DoFn<String, KV<String, TreeMap<Long, Float>>>() {
            @Override
            public void processElement(ProcessContext processContext) throws Exception {

                // Parse some data

                processContext.output(KV.of(key, value));
            }
        }));

final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv =
        data.apply(GroupByKey.<String, TreeMap<Long, Float>>create())
                .apply(View.<String, Iterable<TreeMap<Long, Float>>>asMap());


DoFn<String, String> dofn = new MyDoFn(pcv, localList);

p.apply(TextIO.Read.from("gs://some_text.txt").named("Sizing"))
        .apply(ParDo.named("Generating the Data").withSideInputs(pvc).of(dofn))
        .apply(TextIO.Write.named("Write_out").to(outputFile));

p.run();

We've spent about two days trying various methods of getting this to work. We've narrowed it down to the inclusion of the side input. If the processContext is modified to not use the side input, it will still be very slow as long as it's included. If we don't call .withSideInput() it's very fast again.

Just to clarify, we've tested this on sideinput sizes from 20mb - 1.5gb.

Very grateful for any insight.

Edit Including a few job ID's:

2016-01-20_14_31_12-1354600113427960103

2016-01-21_08_04_33-1642110636871153093 (latest)

Upvotes: 4

Views: 2537

Answers (3)

Lukasz Cwik
Lukasz Cwik

Reputation: 1731

Please try out the Dataflow SDK 1.5.0+, they should have addressed the known performance problems of your issue.

Side inputs in the Dataflow SDK 1.5.0+ use a new distributed format when running batch pipelines. Note that streaming pipelines and pipelines using older versions of the Dataflow SDK are still subject to re-reading the side input if the view can not be cached entirely in memory.

With the new format, we use an index to provide a block based lookup and caching strategy. Thus when looking into a list by index or looking into a map by key, only the block that contains said index or key will be loaded. Having a cache size which is greater than the working set size will aid in performance as frequently accessed indices/keys will not require re-reading the block they are contained in.

Upvotes: 6

Davor Bonaci
Davor Bonaci

Reputation: 1729

Side inputs in the Dataflow SDK can, indeed, introduce slowness if not used carefully. Most often, this happens when each worker has to re-read the entire side input per main input element.

You seem to be using a PCollectionView created via asMap. In this case, the entire side input PCollection must fit into memory of each worker. When needed, Dataflow SDK will copy this data on each worker to create such a map.

That said, the map on each worker may be created just once or multiple times, depending on several factors. If its size is small enough (usually less than 100 MB), it is likely that the map is read only once per worker and reused across elements and across bundles. However, if its size cannot fit into our cache (or something else evicts it from the cache), the entire map may be re-read again and again on each worker. This is, most often, the root-cause of the slowness.

The cache size is controllable via PipelineOptions as follows, but due to several important bugfixes, this should be used in version 1.3.0 and later only.

DataflowWorkerHarnessOptions opts = PipelineOptionsFactory.fromArgs(args).withValidation().create().cloneAs(DataflowWorkerHarnessOptions.class);
opts.setWorkerCacheMb(500);
Pipeline p = Pipeline.create(opts);

For the time being, the fix is to change the structure of the pipeline to avoid excessive re-reading. I cannot offer you a specific advice there, as you haven't shared enough information about your use case. (Please post a separate question if needed.)


We are actively working on a related feature we refer to as distributed side inputs. This will allow a lookup into the side input PCollection without constructing the entire map on the worker. It should significantly help performance in this and related cases. We expect to release this very shortly.


I didn't see anything particularly suspicious about the two jobs you have quoted. They've been cancelled relatively quickly.

Upvotes: 2

rjh.sgc
rjh.sgc

Reputation: 11

I'm manually setting the cache size when creating the pipeline in the following manner:

DataflowWorkerHarnessOptions opts = PipelineOptionsFactory.fromArgs(args).withValidation().create().cloneAs(DataflowWorkerHarnessOptions.class);
opts.setWorkerCacheMb(500);
Pipeline p = Pipeline.create(opts);

for a side input of ~25mb, this speeds up the execution time considerably (job id 2016-01-25_08_42_52-657610385797048159) vs. creating a pipeline in the manner below (job id 2016-01-25_07_56_35-14864561652521586982)

  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

However, when the side input is increased to ~400mb, no increase in cache size improves performance. Theoretically, is all the memory indicated by the GCE machine type available for use by the worker? What would invalidate or evict something from the worker cache, forcing the re-read?

Upvotes: -1

Related Questions