sheetalg
sheetalg

Reputation: 59

Hadoop MultipleOutputs does not write to multiple files when file formats are custom format

I am trying to read from cassandra and write the reducers output to multiple output files using MultipleOutputs api (Hadoop version 1.0.3). The file formats in my case are custom output formats extending FileOutputFormat. I have configured my job in a similar manner as shown in MultipleOutputs api. However, when I run the job, I only get one output file named part-r-0000 which is in text output format. If job.setOutputFormatClass() is not set, by default it considers TextOutputFormat to be the format. Also it will only allow one of the two format classes to be initialized. It completely ignores the output formats I specified in MulitpleOutputs.addNamedOutput(job, "format1", MyCustomFileFormat1.class, Text.class, Text.class) and MulitpleOutputs.addNamedOutput(job, "format2", MyCustomFileFormat2.class, Text.class, Text.class). Is someone else facing similar problem or am I doing something wrong ?

I also tried to write a very simple MR program which reads from a text file and writes the output in 2 formats TextOutputFormat and SequenceFileOutputFormat as shown in the MultipleOutputs api. However, no luck there as well. I get only 1 output file in text output format.

Can someone help me with this ?

Job job = new Job(getConf(), "cfdefGen");
job.setJarByClass(CfdefGeneration.class);

//read input from cassandra column family
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.getConfiguration().set("cassandra.consistencylevel.read", "QUORUM");

//thrift input job configurations
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), HOST);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "RandomPartitioner");

SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("classification")));
//ConfigHelper.setRangeBatchSize(job.getConfiguration(), 2048);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);

//specification for mapper
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//specifications for reducer (writing to files)
job.setReducerClass(ReducerToFileSystem.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setOutputFormatClass(MyCdbWriter1.class);
job.setNumReduceTasks(1);

//set output path for storing output files
Path filePath = new Path(OUTPUT_DIR);
FileSystem hdfs = FileSystem.get(getConf());
if(hdfs.exists(filePath)){
    hdfs.delete(filePath, true);
}
MyCdbWriter1.setOutputPath(job, new Path(OUTPUT_DIR));

MultipleOutputs.addNamedOutput(job, "cdb1', MyCdbWriter1.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "cdb2", MyCdbWriter2.class, Text.class, Text.class);

boolean success = job.waitForCompletion(true);
return success ? 0:1;

public static class ReducerToFileSystem extends Reducer<Text, Text, Text, Text>
{
    private MultipleOutputs<Text, Text> mos;

    public void setup(Context context){
        mos = new MultipleOutputs<Text, Text>(context);
    }

    //public void reduce(Text key, Text value, Context context) 
    //throws IOException, InterruptedException (This was the mistake, changed the signature and it worked fine)
    public void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException
    {
        //context.write(key, value);
        mos.write("cdb1", key, value, OUTPUT_DIR+"/"+"cdb1");
        mos.write("cdb2", key, value, OUTPUT_DIR+"/"+"cdb2");
        context.progress();
    }

    public void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }
}

public class MyCdbWriter1<K, V> extends FileOutputFormat<K, V> 
{
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
    {
    }

    public static void setOutputPath(Job job, Path outputDir) {
        job.getConfiguration().set("mapred.output.dir", outputDir.toString());
    }

    protected static class CdbDataRecord<K, V> extends RecordWriter<K, V>
    {
        @override
        write()
        close()
    }
}

Upvotes: 2

Views: 4452

Answers (2)

xavriley
xavriley

Reputation: 611

I also encountered a similar issue - mine turned out to be that I was filtering all my records in the Map process so nothing was being passed to Reduce. With un-named multiple outputs in the reduce task, this still resulted in a _SUCCESS file and an empty part-r-00000 file.

Upvotes: 0

sheetalg
sheetalg

Reputation: 59

I found my mistake after debugging that my reduce method is never called. I found that my function definition did not match API's definition, changed it from public void reduce(Text key, Text value, Context context) to public void reduce(Text key, Iterable<Text> values, Context context). I don't know why reduce method does not have @Override tag, it would have prevented my mistake.

Upvotes: 3

Related Questions