Josh Hansen
Josh Hansen

Reputation: 1478

Can Hadoop MultipleInputs.addInputPath be made to work recursively?

Recent versions of Hadoop already easily support nested input directories using FileInputFormat.setInputDirRecursive, which relies on the mapreduce.input.fileinputformat.input.dir.recursive configuration key.

It's also possible to specify multiple mapper/input-directory combinations using MultipleInputs.addInputPath.

But can I do both at the same time? In other words, is there a way specify multiple mapper/input-directory combinations where the input directories are included recursively?

A concrete example: I have the following directory structure:

I tried something like this:

Job job = Job.getInstance(conf);
FileInputFormat.setInputDirRecursive(job, true);
MultipleInputs.addInputPath(job, new Path("/dataset1"), TextInputFormat.class,
    Mapper1.class);
MultipleInputs.addInputPath(job, new Path("/dataset2"), TextInputFormat.class,
    Mapper2.class);
...
job.waitForCompletion(true);

But then I get an exception along the lines of Error: java.io.IOException: 's3://bucketname/dataset1/subdir1' is a directory

This is running in Amazon EMR under Hadoop 2.4.0.

Edit: Hadoop version is 2.4.0, not 2.6.0

Upvotes: 0

Views: 914

Answers (1)

rbyndoor
rbyndoor

Reputation: 729

Well, Not sure about s3, but this is normal. Should point to file and not a directory.

Try this.

Method 1

final static public void addInputPathRecursively(FileSystem fs, Path path, PathFilter inputFilter, Job job,boolean swithc) throws IOException
        {
            for (FileStatus stat : fs.listStatus(path, inputFilter))
            {
                if (stat.isDirectory())
                {

                    addInputPathRecursively(fs, stat.getPath(), inputFilter, job);
                } else
                {
                    if (swithc)
                    {
                        MultipleInputs.addInputPath(job, new Path(stat.getPath().toString()), TextInputFormat.class, Mapper1.class);

                    } else
                        MultipleInputs.addInputPath(job, new Path(stat.getPath().toString()), TextInputFormat.class, Mapper2.class);
                }
            }
        }

In the driver class you can call it accordingly.

addInputPathRecursively(fs, datset1path, new FileFilter(conf, fs,
                        new String[] { txt }), job,true);

addInputPathRecursively(fs, datset2path, new FileFilter(conf, fs,
                        new String[] { txt }), job,false);

This is an example but working control the pathfilter properly if you want to apply regEx.

  • Method 2 Setting this should do the magic too.

    FileInputFormat.setInputDirRecursive(job, true);

  • Method 3 Bypass inside the mapper and process at line level. (Not a good idea!)

Upvotes: 1

Related Questions