KrazyGautam
KrazyGautam

Reputation: 2682

How to efficiently read multiple small parquet files with Spark? is there a CombineParquetInputFormat?

Spark generated multiple small parquet Files. How can one handle efficiently small number of parquet files both on producer and consumer Spark jobs.

Upvotes: 1

Views: 2994

Answers (2)

KrazyGautam
KrazyGautam

Reputation: 2682

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;

import java.io.IOException;

public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> {


    @Override
    public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext
            context) throws IOException {
        CombineFileSplit combineSplit = (CombineFileSplit) split;
        return new CombineFileRecordReader(combineSplit, context, CombineParquetrecordReader.class);
    }

    private static class CombineParquetrecordReader<T> extends CombineFileRecordReaderWrapper<Void, T> {


        public  CombineParquetrecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) throws
                IOException, InterruptedException {
            super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx);
        }
    }
}

On consumer side please use the CombinedParquetInputFile: which will force multiple small files to be read from a single task .

On Producer side : User coalesce(numFiles) to have adequate no of files as output.

How to use the customInputFileFormat in spark and form RDD and Dataframes :

     JavaRDD<Row> javaRDD = sc.newAPIHadoopFile(hdfsInputPath, CombineParquetInputFormat.class, Void.class, "AvroPojo.class", sc.hadoopConfiguration())
                                            .values()
                                            .map(p -> {
                                                Row row = RowFactory.create(avroPojoToObjectArray((p));
                                                return row;
                                            });


   sc.hadoopConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,true);


//set max split size else only 1 task wil be spawned    
 sc.hadoopConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long) (128 * 1024 * 1024));


     StructType outputSchema = (StructType) SchemaConverters.toSqlType(Profile.getClassSchema()).dataType();
            final DataFrame requiredDataFrame = sqlContext.createDataFrame(javaRDD, outputSchema);

Upvotes: 1

Tal Joffe
Tal Joffe

Reputation: 5828

The most straightforward approach IMHO is to use repartition/coalesce (prefer coalesce unless data is skewed and you want to create same-sized outputs) before writing parquet files so that you will not create small files to begin with.

df
  .map(<some transformation>)
  .filter(<some filter>)
  ///...
  .coalesce(<number of partitions>)
  .write
  .parquet(<path>)

Number of partitions could be calculated on count of total rows in dataframe divided by some factor that through trial and error will give you the proper size.

It is best practice in most of the Big data frameworks to prefer few larger files to many small files (file size I normally use is 100-500MB)

If you already have data in small files and you want to merge it as far as I'm aware you will have to read it with Spark repartition to fewer partitions and write it again.

Upvotes: 1

Related Questions