Reputation: 1478
Recent versions of Hadoop already easily support nested input directories using FileInputFormat.setInputDirRecursive
, which relies on the mapreduce.input.fileinputformat.input.dir.recursive
configuration key.
It's also possible to specify multiple mapper/input-directory combinations using MultipleInputs.addInputPath
.
But can I do both at the same time? In other words, is there a way specify multiple mapper/input-directory combinations where the input directories are included recursively?
A concrete example: I have the following directory structure:
/dataset1/subdir1/data1.txt
/dataset2/subdir2/data2.txt
I tried something like this:
Job job = Job.getInstance(conf);
FileInputFormat.setInputDirRecursive(job, true);
MultipleInputs.addInputPath(job, new Path("/dataset1"), TextInputFormat.class,
Mapper1.class);
MultipleInputs.addInputPath(job, new Path("/dataset2"), TextInputFormat.class,
Mapper2.class);
...
job.waitForCompletion(true);
But then I get an exception along the lines of Error: java.io.IOException: 's3://bucketname/dataset1/subdir1' is a directory
This is running in Amazon EMR under Hadoop 2.4.0.
Edit: Hadoop version is 2.4.0, not 2.6.0
Upvotes: 0
Views: 914
Reputation: 729
Well, Not sure about s3, but this is normal. Should point to file and not a directory.
Try this.
Method 1
final static public void addInputPathRecursively(FileSystem fs, Path path, PathFilter inputFilter, Job job,boolean swithc) throws IOException
{
for (FileStatus stat : fs.listStatus(path, inputFilter))
{
if (stat.isDirectory())
{
addInputPathRecursively(fs, stat.getPath(), inputFilter, job);
} else
{
if (swithc)
{
MultipleInputs.addInputPath(job, new Path(stat.getPath().toString()), TextInputFormat.class, Mapper1.class);
} else
MultipleInputs.addInputPath(job, new Path(stat.getPath().toString()), TextInputFormat.class, Mapper2.class);
}
}
}
In the driver class you can call it accordingly.
addInputPathRecursively(fs, datset1path, new FileFilter(conf, fs,
new String[] { txt }), job,true);
addInputPathRecursively(fs, datset2path, new FileFilter(conf, fs,
new String[] { txt }), job,false);
This is an example but working control the pathfilter properly if you want to apply regEx.
Method 2 Setting this should do the magic too.
FileInputFormat.setInputDirRecursive(job, true);
Method 3 Bypass inside the mapper and process at line level. (Not a good idea!)
Upvotes: 1