Reputation: 5173
New to Hadoop and I'm trying to understand how Hadoop read file input : I am able to use this code below to run Hadoop job from 2 column ( key / value ) input file :
But what if I have 5 columns and the ( key /value ) i want is A&E ( instead of A&B) which function do I need to modify exactly ?
public class InverterCounter extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
output.collect(value, key);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, IntWritable> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int count = 0;
while (values.hasNext()) {
values.next();
count++;
}
output.collect(key, new IntWritable(count));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, InverterCounter.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("InverterCounter");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new InverterCounter(), args);
System.exit(res);
}
}
Any recommendation would appreciated, I was trying to change job.set("key.value.separator.in.input.line", ","); and job.setInputFormat(KeyValueTextInputFormat.class); with no luck still could not figure this out.
Thanks
Upvotes: 1
Views: 2724
Reputation: 2725
KeyValueTextInputFormat
assumes that the key is at the start of each line, so it isn't applicable for your 6 column data set.
Instead, you can use TextInputFormat
and extract the key and value yourself. I'm assuming all values in the line are separated by commas (and that there are no commas in the data, which is another story).
With TextInputFormat
you receive the full line in value
, and the position of the line in the file in key
. We don't need the position so we will ignore it. With the full line in a single Text
we can turn it into a String
, split it by commas, and derive the key and value to emit:
public class InverterCounter extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String[] lineFields = value.toString().split(",");
Text outputKey = new Text(lineFields[0] + "," + lineFields[4]);
Text outputValue = new Text(lineFields[1] + "," + lineFields[2] + "," +
lineFields[3] + "," + lineFields[5]);
output.collect(outputKey, outputValue);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, IntWritable> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int count = 0;
while (values.hasNext()) {
values.next();
count++;
}
output.collect(key, new IntWritable(count));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, InverterCounter.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("InverterCounter");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new InverterCounter(), args);
System.exit(res);
}
}
I haven't had a chance to test this, so there may be small bugs. You would probably want to rename the class because it is no longer inverting anything. Finally, the value has been sent to the reducer but it isn't being used, so you could just as easily send a NullWritable
instead.
Upvotes: 1