Apk
Apk

Reputation: 11

FileAlreadyExistsException while running MapReduce code

This program is supposed to accomplish the MapReduce job. The output of the first job has to be taken as the input of the second job.

When I run it, I get two errors:

  1. Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException
  2. The mapping part is running 100% but the reducer is not running.

Here's my code:

import java.io.IOException;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;

public class MaxPubYear {
    public static class FrequencyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Text word = new Text();
            String delim = ";";
            Integer year = 0;
            String tokens[] = value.toString().split(delim);
            if (tokens.length >= 4) {
                year = TryParseInt(tokens[3].replace("\"", "").trim());
                if (year > 0) {
                    word = new Text(year.toString());
                    context.write(word, new IntWritable(1));
                }
            }
        }
    }

    public static class FrequencyReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static class MaxPubYearMapper extends
            Mapper<LongWritable, Text, IntWritable, Text> {
        public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
            String delim = "\t";
            Text valtosend = new Text();
            String tokens[] = value.toString().split(delim);
            if (tokens.length == 2) {
                valtosend.set(tokens[0] + ";" + tokens[1]);
                context.write(new IntWritable(1), valtosend);
            }

        }
    }

    public static class MaxPubYearReducer extends
            Reducer<IntWritable, Text, Text, IntWritable> {

        public void reduce(IntWritable key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            int maxiValue = Integer.MIN_VALUE;
            String maxiYear = "";
            for (Text value : values) {
                String token[] = value.toString().split(";");
                if (token.length == 2
                        && TryParseInt(token[1]).intValue() > maxiValue) {
                    maxiValue = TryParseInt(token[1]);
                    maxiYear = token[0];
                }
            }
            context.write(new Text(maxiYear), new IntWritable(maxiValue));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Frequency");
        job.setJarByClass(MaxPubYear.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(FrequencyMapper.class);
        job.setCombinerClass(FrequencyReducer.class);
        job.setReducerClass(FrequencyReducer.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setInputFormatClass(TextInputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1] + "_temp"));
        int exitCode = job.waitForCompletion(true) ? 0 : 1;

        if (exitCode == 0) {
            Job SecondJob = new Job(conf, "Maximum Publication year");
            SecondJob.setJarByClass(MaxPubYear.class);

            SecondJob.setOutputKeyClass(Text.class);
            SecondJob.setOutputValueClass(IntWritable.class);

            SecondJob.setMapOutputKeyClass(IntWritable.class);
            SecondJob.setMapOutputValueClass(Text.class);

            SecondJob.setMapperClass(MaxPubYearMapper.class);
            SecondJob.setReducerClass(MaxPubYearReducer.class);

            FileInputFormat.addInputPath(SecondJob, new Path(args[1] + "_temp"));
            FileOutputFormat.setOutputPath(SecondJob, new Path(args[1]));
            System.exit(SecondJob.waitForCompletion(true) ? 0 : 1);

        }
    }

    public static Integer TryParseInt(String trim) {
        // TODO Auto-generated method stub
        return(0);
    }
}

Upvotes: 0

Views: 197

Answers (1)

Ankur Shanbhag
Ankur Shanbhag

Reputation: 7804

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException

Map-reduce job does not overwrite the contents in a existing directory. Output path to MR job must be a directory path which does not exist. MR job will create a directory at specified path with files within it.

In your code:

FileOutputFormat.setOutputPath(job, new Path(args[1] + "_temp"));

Make sure this path does not exist when you run MR job.

Upvotes: 3

Related Questions