Nira
Nira

Reputation: 469

GCP dataflow - processing JSON takes too long

I am trying to process json files in a bucket and write the results into a bucket:

    DataflowPipelineOptions options = PipelineOptionsFactory.create()
            .as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject("the-project");
    options.setStagingLocation("gs://some-bucket/temp/");

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.from("gs://some-bucket/2016/04/28/*/*.json"))
    .apply(ParDo.named("SanitizeJson").of(new DoFn<String, String>() {
        @Override
        public void processElement(ProcessContext c) {
            try {
                JsonFactory factory = JacksonFactory.getDefaultInstance();
                String json = c.element();
                SomeClass e = factory.fromString(json, SomeClass.class);
                // manipulate the object a bit...
                c.output(factory.toString(e));
            } catch (Exception err) {
                LOG.error("Failed to process element: " + c.element(), err);
            }
        }
    }))
    .apply(TextIO.Write.to("gs://some-bucket/output/"));
    p.run();

I have around 50,000 files under the path gs://some-bucket/2016/04/28/ (in sub-directories). My question is: does it make sense that this takes more than an hour to complete? Doing something similar on a Spark cluster in amazon takes about 15-20 minutes. I suspect that I might be doing something inefficiently.

EDIT:

In my Spark job I aggregate all the results in a DataFrame and only then write the output, all at once. I noticed that my pipeline here writes each file separately, I assume that is why it's taking much longer. Is there a way to change this behavior?

Upvotes: 1

Views: 978

Answers (1)

jkff
jkff

Reputation: 17913

Your jobs are hitting a couple of performance issues in Dataflow, caused by the fact that it is more optimized for executing work in larger increments, while your job is processing lots of very small files. As a result, some aspects of the job's execution end up dominated by per-file overhead. Here's some details and suggestions.

  • The job is limited rather by writing output than by reading input (though reading input is also a significant part). You can significantly cut that overhead by specifying withNumShards on your TextIO.Write, depending on how many files you want in the output. E.g. 100 could be a reasonable value. By default you're getting an unspecified number of files which in this case, given current behavior of the Dataflow optimizer, matches number of input files: usually it is a good idea because it allows us to not materialize the intermediate data, but in this case it's not a good idea because the input files are so small and per-file overhead is more important.
  • I recommend to set maxNumWorkers to a value like e.g. 12 - currently the second job is autoscaling to an excessively large number of workers. This is caused by Dataflow's autoscaling currently being geared toward jobs that process data in larger increments - it currently doesn't take into account per-file overhead and behaves not so well in your case.
  • The second job is also hitting a bug because of which it fails to finalize the written output. We're investigating, however setting maxNumWorkers should also make it complete successfully.

To put it shortly:

  • set maxNumWorkers=12
  • set TextIO.Write.to("...").withNumShards(100)

and it should run much better.

Upvotes: 1

Related Questions