JPC
JPC

Reputation: 5173

how to custom select column reading for hadoop input in java for map reducer job

New to Hadoop and I'm trying to understand how Hadoop read file input : I am able to use this code below to run Hadoop job from 2 column ( key / value ) input file :
enter image description here

But what if I have 5 columns and the ( key /value ) i want is A&E ( instead of A&B) which function do I need to modify exactly ?

enter image description here

public class InverterCounter extends Configured implements Tool {

    public static class MapClass extends MapReduceBase
        implements Mapper<Text, Text, Text, Text> {

        public void map(Text key, Text value,
                        OutputCollector<Text, Text> output,
                        Reporter reporter) throws IOException {

            output.collect(value, key);
        }
    }   
    public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, IntWritable> {

        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, IntWritable> output,
                           Reporter reporter) throws IOException {

            int count = 0;
            while (values.hasNext()) {
                values.next();
                count++;
            }
            output.collect(key, new IntWritable(count));
        }
    }   
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();      
        JobConf job = new JobConf(conf, InverterCounter.class);    
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("InverterCounter");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);   
        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.set("key.value.separator.in.input.line", ",");       
        JobClient.runJob(job);      
        return 0;
    }  
    public static void main(String[] args) throws Exception { 
        int res = ToolRunner.run(new Configuration(), new InverterCounter(), args);       
        System.exit(res);
    }
}

Any recommendation would appreciated, I was trying to change job.set("key.value.separator.in.input.line", ","); and job.setInputFormat(KeyValueTextInputFormat.class); with no luck still could not figure this out.

Thanks

Upvotes: 1

Views: 2724

Answers (1)

Jeremy Beard
Jeremy Beard

Reputation: 2725

KeyValueTextInputFormat assumes that the key is at the start of each line, so it isn't applicable for your 6 column data set.

Instead, you can use TextInputFormat and extract the key and value yourself. I'm assuming all values in the line are separated by commas (and that there are no commas in the data, which is another story).

With TextInputFormat you receive the full line in value, and the position of the line in the file in key. We don't need the position so we will ignore it. With the full line in a single Text we can turn it into a String, split it by commas, and derive the key and value to emit:

public class InverterCounter extends Configured implements Tool {

    public static class MapClass extends MapReduceBase
        implements Mapper<Text, Text, Text, Text> {

        public void map(Text key, Text value,
                        OutputCollector<Text, Text> output,
                        Reporter reporter) throws IOException {

            String[] lineFields = value.toString().split(",");
            Text outputKey = new Text(lineFields[0] + "," + lineFields[4]);
            Text outputValue = new Text(lineFields[1] + "," + lineFields[2] + "," +
                                        lineFields[3] + "," + lineFields[5]);

            output.collect(outputKey, outputValue);
        }
    }   
    public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, IntWritable> {

        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, IntWritable> output,
                           Reporter reporter) throws IOException {

            int count = 0;
            while (values.hasNext()) {
                values.next();
                count++;
            }
            output.collect(key, new IntWritable(count));
        }
    }   
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();      
        JobConf job = new JobConf(conf, InverterCounter.class);    
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("InverterCounter");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);   
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        JobClient.runJob(job);
        return 0;
    }  
    public static void main(String[] args) throws Exception { 
        int res = ToolRunner.run(new Configuration(), new InverterCounter(), args);       
        System.exit(res);
    }
}

I haven't had a chance to test this, so there may be small bugs. You would probably want to rename the class because it is no longer inverting anything. Finally, the value has been sent to the reducer but it isn't being used, so you could just as easily send a NullWritable instead.

Upvotes: 1

Related Questions