Seb
Seb

Reputation: 378

coalesce does not reduce my number of output files

I have a spark job which manages a RDD[SpecificRecordBase], on a HDFS.

My problem is that it generates a lot of files, including 95% of empty avro files. I tried to use coalesce to reduce my number of partitions on my RDD, and so the number of my output files, but it has no effect.

 def write(data: RDD[SpecificRecordBase]) = {
   data.coalesce(1, false)    //has no effect
   val conf = new Configuration()
   val job = new org.apache.hadoop.mapreduce.Job(conf)

   AvroJob.setOutputKeySchema(job, schema)
   val pair = new PairRDDFunctions(rdd)
   pair.saveAsNewAPIHadoopFile(
     outputAvroDataPath,
     classOf[AvroKey[SpecificRecordBase]],
     classOf[org.apache.hadoop.io.NullWritable],
     classOf[AvroKeyOutputFormat[SpecificRecordBase]],
     job.getConfiguration)
}

I suppose something is lost between rdd partition configuration and HDFS partition, and maybe saveAsNewAPIHadoopFile does not take it into account, but I'm not sure of it.

Have I missed something ?

Could somebody explain what really appends when calling saveAsNewAPIHadoopFile according to rdd partitionning ?

Upvotes: 0

Views: 2543

Answers (1)

Seb
Seb

Reputation: 378

Answering my own question thanks to @0x0FFF, the correct code should be :

    def write(data: RDD[SpecificRecordBase]) = {
           val rdd = data.map(t => (new AvroKey(t), org.apache.hadoop.io.NullWritable.get))
           val rdd1Partition = rdd.coalesce(1, false)  //change nb of partitions to 1

           val conf = new Configuration()
           val job = new org.apache.hadoop.mapreduce.Job(conf)

           AvroJob.setOutputKeySchema(job, schema)
           val pair = new PairRDDFunctions(rdd1Partition) //so only one file will be in output
           pair.saveAsNewAPIHadoopFile(
             outputAvroDataPath,
             classOf[AvroKey[SpecificRecordBase]],
             classOf[org.apache.hadoop.io.NullWritable],
             classOf[AvroKeyOutputFormat[SpecificRecordBase]],
             job.getConfiguration)
        }

Thank you again !

Upvotes: 1

Related Questions