Reputation: 378
I have a spark job which manages a RDD[SpecificRecordBase]
, on a HDFS
.
My problem is that it generates a lot of files, including 95% of empty avro files. I tried to use coalesce to reduce my number of partitions on my RDD, and so the number of my output files, but it has no effect.
def write(data: RDD[SpecificRecordBase]) = {
data.coalesce(1, false) //has no effect
val conf = new Configuration()
val job = new org.apache.hadoop.mapreduce.Job(conf)
AvroJob.setOutputKeySchema(job, schema)
val pair = new PairRDDFunctions(rdd)
pair.saveAsNewAPIHadoopFile(
outputAvroDataPath,
classOf[AvroKey[SpecificRecordBase]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecordBase]],
job.getConfiguration)
}
I suppose something is lost between rdd
partition configuration and HDFS
partition, and maybe saveAsNewAPIHadoopFile
does not take it into account, but I'm not sure of it.
Have I missed something ?
Could somebody explain what really appends when calling saveAsNewAPIHadoopFile
according to rdd partitionning ?
Upvotes: 0
Views: 2543
Reputation: 378
Answering my own question thanks to @0x0FFF, the correct code should be :
def write(data: RDD[SpecificRecordBase]) = {
val rdd = data.map(t => (new AvroKey(t), org.apache.hadoop.io.NullWritable.get))
val rdd1Partition = rdd.coalesce(1, false) //change nb of partitions to 1
val conf = new Configuration()
val job = new org.apache.hadoop.mapreduce.Job(conf)
AvroJob.setOutputKeySchema(job, schema)
val pair = new PairRDDFunctions(rdd1Partition) //so only one file will be in output
pair.saveAsNewAPIHadoopFile(
outputAvroDataPath,
classOf[AvroKey[SpecificRecordBase]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecordBase]],
job.getConfiguration)
}
Thank you again !
Upvotes: 1