Reputation: 279
I am using Hadoop 0.20.2 (that cannot be changed) and I want to add a filter to my input path. The data looks as follows:
/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2
and I only want to process all files with train in them.
A look at the FileInputFormat class suggests to use:
FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)
and this is where my problem starts, since PathFilter is an interface - of course, I can extend the interface but then I still do not have an implementation. So instead, I implemented the interface:
class TrainFilter implements PathFilter
{
boolean accept(Path path)
{
return path.toString().contains("train");
}
}
When I use TrainFilter as PathFilter the code compiles, however when I run it, I get an exception as the input path is screwed up. Without setting the filter, my code runs through all files that are below /path1, however, when setting the filter, it throws the error:
InvalidInputException: Input path does not exist hdfs://localhost:9000/path1
Here is how I set it up in the driver code:
job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);
Any suggestions of what I am doing wrong here?
EDIT: I found the problem. The first call to the PathFilter is always the directory itself (/path1) and since it does not contain ("train"), the directory itself is invalid and thus the exception is thrown. Which brings me to another question: how can I test if an arbitrary path is a directory? For all I know, I need a reference to the FileSystem, which is not one of the default parameters of PathFilter.
Upvotes: 2
Views: 3241
Reputation: 75
I know this is a very old question, but it helped me find an answer to excluding paths when all the examples of it failed me, such as documented here.
I just want to warn about the answer that @ChrisWhite gave, because I am working on Hadoop 3.3.0 API and using setConf threw a NullPointerException when I was about to pull something out of the configuration.
I found that instead I'll keep the extends Configured
in the PathFilter signature but just put getConf().get(<your configuration parameter name>)
whenever you need what you need from the job configuration.
So, my excluding PathFilter looks like this:
public static class ExcludedPathsFilter extends Configured implements PathFilter {
public boolean accept(Path includePathGlob){
//debugging
System.out.println("excludedPath parameter is "+getConf().get("excludedPath")+", includePath parameter is "+includePathGlob.toString()+" and !includePathGlob.toString().contains(getConf().get(\"excludedPath\")) is "+!includePathGlob.toString().contains(getConf().get("excludedPath")));
return !includePathGlob.toString().contains(getConf().get("excludedPath"));
}
}
Upvotes: 0
Reputation: 641
A quick fix, You can blacklist paths instead of whitelisting like return false if path contains "test"
Upvotes: 2
Reputation: 30089
You can get a FileSystem instance by having your Filter implement the Configurable interface (or extend the Configured class), and create a fileSystem instance variable in the setConf method:
class TrainFilter extends Configured implements PathFilter
{
FileSystem fileSystem;
boolean accept(Path path)
{
// TODO: use fileSystem here to determine if path is a directory
return path.toString().contains("train");
}
public void setConf(Configuration conf) {
if (conf != null) {
fileSystem = FileSystem.get(conf);
}
}
}
Upvotes: 1
Reputation: 10650
Alternatively, you may try to loop through all of the files in the given directory and check if the file names begin with train. E.g:
Job job = new Job(conf, "myJob");
List<Path> inputhPaths = new ArrayList<Path>();
String basePath = "/user/hadoop/path";
FileSystem fs = FileSystem.get(conf);
FileStatus[] listStatus = fs.globStatus(new Path(basePath + "/train*"));
for (FileStatus fstat : listStatus) {
inputhPaths.add(fstat.getPath());
}
FileInputFormat.setInputPaths(job,
(Path[]) inputhPaths.toArray(new Path[inputhPaths.size()]));
Upvotes: 6