Reputation: 11
I am fairly new to hadoop and I am running multiple mapReduce jobs on a 5 node cluster. I have started to get 'Filesystem closed' exceptions when running more than one thread. The jobs work fine when run one at a time. The errors come in after mapping, right before reducing. It looks like this:
java.lang.Exception: java.io.IOException: Filesystem closed
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:399)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:552)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:648)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:167)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:526)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:338)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
This doesn't happen all the time, and it will run fine if I re-execute the failed job. Unfortunately, this takes up too much time. I am assuming this has to do with multiple tasks accessing the same input file, and when one task finishes it is closing the input file for all tasks. If this is the issue I would like to know is how to override this. I tried overriding cleanup within the mapper to re-open the path, but this seems stupid and doesn't work.
@Override
public void cleanup(Context context){
Job tempJob;
try {
tempJob = new Job();
Path fs = ((FileSplit) context.getInputSplit()).getPath();
FileInputFormat.addInputPath(tempJob, fs);
System.out.println("Finished map task for " + context.getJobName());
} catch (IOException e) {
e.printStackTrace();
}
}
I am also wondering if this is a fundamental problem with using a threadpool to execute hadoop mapReduce jobs. Thanks for any ideas.
EDIT: I may have been a little unclear when I was referring to Jobs and Tasks. I am actually running multiple jobs with their own mappers and reducers. Each of these jobs will generate a column for a specific table I am creating. Say a sum or a count. Each job has its own thread and they all are accessing the same input file. The problem I am having is that when some of the jobs finish they will throw the 'Filesystem closed Exception'. I am also using Yarn if that might make a difference.
Upvotes: 1
Views: 3396
Reputation: 41458
As a general rule, unless you have a very CPU intensive job, I wouldn't recommend using multiple threads within the same task, it increases the likelihood of issues in your JVM and the cost of rerunning a task would be far greater. You should probably consider increasing your number of map tasks instead, of course each task will run in a separate JVM but it is much cleaner that way.
If you really want to go the multi-threaded way, then I suspect you're using the wrong type of mapper, for a multi-threaded application you should use a MultithreadedMapper
which has a different implementation of the run
method and should be thread-safe. You can use it like this:
job.setMapperClass(MultithreadedMapper.class);
You can specify the number of threads like this:
int numThreads = 42;
MultithreadedMapper.setNumberOfThreads(numThreads);
Upvotes: 1