Reputation:
To expand on my header in using Hadoop 2.6.. and need to send whole files to my mapper instead of a single line at a time. I have followed Tom Whites code in the Definitive Guide to create WholeFileInputFormat and WholeFileRecordReader but my Mapper is still processing files 1 line at a time. Can anyone see what I'm missing in my code? I used the book example exactly from what I can see. Any guidance will be much appreciated.
WholeFileInputFormat.java
public class WholeFileInputFormat extends FileInputFormat <NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path file){
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
WholeFileRecordReader.java
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException{
if (!processed){
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException{
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException{
return value;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException{
//do nothing :)
}
}
And the main method for my Mapreduce
public class ECCCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Usage: ProcessLogs <input dir> <output dir>\n");
System.exit(-1);
}
//@SuppressWarnings("deprecation")
Job job = new Job();
job.setJarByClass(ECCCount.class);
job.setJobName("ECCCount");
//FileInputFormat.setInputPaths(job, new Path(args[0]));
WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(ECCCountMapper.class);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
And my Mapper for good measure. Right now it simply returns the value its given as a test case to see if its returning a line or whole file
public class ECCCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(new Text(value), new IntWritable(1));
}
}
Upvotes: 1
Views: 1737
Reputation:
Thanks to Ramzy's input I found my error and was able to get the whole file passed with the following changes
In my main method I needed to specify the InputFormatClass I needed to use.
job.setInputFormatClass(WholeFileInputFormat.class)
and my Mapper needs to expect the correct types as input
public class ECCCountMapper extends Mapper<NullWritable, BytesWritable, Text, IntWritable>{
Those two changes successfully sent a byte[] of the entire file to my mapper where I manipulate it as needed.
Upvotes: 3
Reputation: 7138
Issue can be the input format of mapper. You have LongWritable and text. however in the example mentioned, they have used NullWritable, BytesWritable because that is what a WholeFileInputFormat is having. Also, you need to give job.setInputFormatClass(WholeFileInputFormat.class); in Job class(main method). Hope it helps, and Happy Coding
Upvotes: 3