bigdataclown
bigdataclown

Reputation: 139

Google Dataflow: Request payload size exceeds the limit: 10485760 bytes

when trying to run a large transform on ~ 800.000 files, I get the above error message when trying to run the pipeline.

Here is the code:

public static void main(String[] args) {
Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());    
    GcsUtil u = getUtil(p.getOptions());

    try{
        List<GcsPath> paths = u.expand(GcsPath.fromUri("gs://tlogdataflow/stage/*.zip"));
        List<String> strPaths = new ArrayList<String>();
        for(GcsPath pa: paths){
            strPaths.add(pa.toUri().toString());
        }           

        p.apply(Create.of(strPaths))
         .apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")));
        p.run();
    }
    catch(IOException io){
        //
    }

}

I thought thats exactly what google data flow is for? Handling large amounts of files / data?

Is there a way to split the load in order to make it work?

Thanks & BR

Phil

Upvotes: 3

Views: 4899

Answers (2)

bigdataclown
bigdataclown

Reputation: 139

Thank you very much! Using your input I solved it like this:

public class ZipPipeline {
private static final Logger LOG = LoggerFactory.getLogger(ZipPipeline.class);

public static void main(String[] args) {
Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());    

    try{
        p.apply(Create.of("gs://tlogdataflow/stage/*.zip"))
         .apply(ParDo.of(new ExpandFN()))
         .apply(ParDo.of(new AddKeyFN()))
         .apply(GroupByKey.<String,String>create())
         .apply(ParDo.of(new FlattenFN()))
         .apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")));
        p.run();

    }
    catch(Exception e){
        LOG.error(e.getMessage());
    }

}

private static class FlattenFN extends DoFn<KV<String,Iterable<String>>, String>{
  private static final long serialVersionUID = 1L;
  @Override
  public void processElement(ProcessContext c){
      KV<String,Iterable<String>> kv = c.element();
      for(String s: kv.getValue()){
          c.output(s);
      }


      }

  }

private static class ExpandFN extends DoFn<String,String>{
private static final long serialVersionUID = 1L;

@Override
  public void processElement(ProcessContext c) throws Exception{
      GcsUtil u = getUtil(c.getPipelineOptions());
      for(GcsPath path : u.expand(GcsPath.fromUri(c.element()))){
          c.output(path.toUri().toString());
      }
  }
}

private static class AddKeyFN extends DoFn<String, KV<String,String>>{
  private static final long serialVersionUID = 1L;
  @Override
  public void processElement(ProcessContext c){
     String path = c.element();
     String monthKey = path.split("_")[4].substring(0, 6);
     c.output(KV.of(monthKey, path));
  }
}

Upvotes: 1

jkff
jkff

Reputation: 17913

Dataflow is good at handling large amounts of data, but has limitations in terms of how large the description of the pipeline can be. Data passed to Create.of() is currently embedded in the pipeline description, so you can't pass very large amounts of data there - instead, large amounts of data should be read from external storage, and the pipeline should specify only their locations.

Think of it as the distinction between the amount of data a program can process vs. the size of the program's code itself.

You can get around this issue by making the expansion happen in a ParDo:

p.apply(Create.of("gs://tlogdataflow/stage/*.zip"))
 .apply(ParDo.of(new ExpandFn()))
 .apply(...fusion break (see below)...)
 .apply(Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")))

where ExpandFn is something like as follows:

private static class ExpandFn extends DoFn<String, String> {
  @ProcessElement
  public void process(ProcessContext c) {
    GcsUtil util = getUtil(c.getPipelineOptions());
    for (String path : util.expand(GcsPath.fromUri(c.element()))) {
      c.output(path);
    }
  }
}

and by fusion break I'm referring to this (basically, ParDo(add unique key) + group by key + Flatten.iterables() + Values.create()). It's not very convenient and there are discussions happening about adding a built-in transform to do this (see this PR and this thread).

Upvotes: 3

Related Questions