Nishant123
Nishant123

Reputation: 1966

Mapreduce program outputs only one record

I have written a MapReduce program to analyse a dataset of users which is of this form

UserID::Gender::Age::MoviesRated::Zip Code
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117

I want to

find the top 10 zipcodes based on the avarage age of users belonging to that zipcode, in the descending order of the avarage age. Top 10 means the youngest 10 avarage age of users of that zipcode.

I have a MapClass, a CombinerClass and a ReducerClass.

My code is as follows

public class TopTenYoungestAverageAgeRaters extends Configured implements Tool {
    private static TreeSet<AverageAge> top10 = new TreeSet<AverageAge>();

    public static class MapClass extends Mapper<LongWritable, Text, Text, AverageAge>
    {

        public boolean isNumeric(String value) // Checks if record is valid
        {
            try
            {
                Integer.parseInt(value);
                return true;
            }
            catch(NumberFormatException e)
            {
                return false;
            }
        }

        public AverageAge toCustomWritable(String[] line)
        {
            AverageAge record = new AverageAge(new IntWritable(Integer.parseInt(line[0])), new IntWritable(Integer.parseInt(line[2])), new Text(line[1]), new IntWritable(Integer.parseInt(line[3])), new Text(line[4]));
            return record;
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String line = value.toString();
            String[] values = line.split("::");
            if(isNumeric(values[0]))
            {
                AverageAge customTuple = toCustomWritable(values);
                context.write(new Text(values[4]), customTuple);
            }

        }
    }

    public static class CombinerClass extends Reducer<Text, AverageAge, Text, AverageAge>
    {
        public void reduce(Text key, Iterable<AverageAge> values, Context context) throws IOException, InterruptedException
        {
            AverageAge newRecord = new AverageAge();
            long age = 0;
            int count = 0;
            for(AverageAge value:values)
            {
                age += value.getUserAge();
                count += 1;
            }
            newRecord.setZipCode(key.toString());
            newRecord.setAverageAge((double)(age/count));
            context.write(key, newRecord);
        }
    }


    public static class ReducerClass extends Reducer<Text, AverageAge, NullWritable, AverageAge>
    {

        public void reduce(Text key, Iterable<AverageAge> values, Context context) throws IOException, InterruptedException
        {

            for(AverageAge value:values)
            {
                top10.add(value);
                if(top10.size() > 10)
                    top10.remove(top10.last());
            }
        }

        protected void cleanup(Context context) throws IOException, InterruptedException
        {
            for(AverageAge avg: top10)
            {
                context.write(NullWritable.get(), avg);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        int res = ToolRunner.run(new Configuration(), new TopTenYoungestAverageAgeRaters(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setMapperClass(MapClass.class);
        job.setCombinerClass(CombinerClass.class);
        job.setReducerClass(ReducerClass.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(AverageAge.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(AverageAge.class);

        FileInputFormat.addInputPath(job, new Path(arg0[0]));
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

}

The MapClass writes the output with zipcode as the key and AverageAge (a custom writable class) as value

The CombinerClass calculates the average age of users belonging to that zipcode and writes key as zipcode and value as AverageAge.

The ReducerClass gives(should give) top 10 zipcodes with average user age, but I get only one record as output.

Also I tried doing System.out.println() in Reducer class to see what values are being passed to ReducerClass but nothing is printed on console(I am running the program locally inside eclipse environment)

I am new to MapReduce and can't figure out the error in this program.

Dataset Source

Upvotes: 0

Views: 235

Answers (1)

Judge Mental
Judge Mental

Reputation: 5239

The problem statement seems contradictory: the top 10 of a descending average age would be the 10 oldest, not the 10 youngest. Better get some clarification there.

Anyway, there are many, many mistakes here.

  1. Combiners are not guaranteed to ever be called
  2. If you have more than one reducer task, you will get up to 10 outputs from each in a different file
  3. As written, the "top 10" you will get will be the 10 lowest zip codes (lexicographically sorted).
  4. Normally by cleanup() time you are not writing records any more.

What you want is to use the shuffle to put records with the same zipcode together, and use the aggregation classes (Combiner and Reducer) to calculate average. The "first 10" requirement can't be determined until you have an age for every zipcode. The crucial point, though, is that in order to calculate an average in distributed fashion, you can never lose the denominator until you reduce. Combiners across your fleet will likely receive records with the same key.

Mapper takes a record and produces a triple:

k::g::a::z |=> z |-> ( 1, a )

Combiner takes a collection of triples with the same key and averages them (and sums the denominators):

z |-> [ ( d1, a1 ), ..., ( dn, an ) ] |=> z |-> ( sum( di ), sum( ai ) / sum ( di ) )

Reducer takes a collection of triples with the same key and averages them, throwing out the denominator:

z |-> [ ( d1, a1 ), ..., ( dn, an ) ] |=> z |-> sum( ai ) / sum ( di )

Your algorithm should work whether you supply the combiner or not; combiners are an optimization that are available only for some map-reduce situations.

To limit to only the top 10 you now need to re-sort the results by average age.

That means another mapper:

z |-> avg |=> avg |-> z

And a reducer that outputs only the top 10 results (exercise left to the reader). Plus there can only be a single reduce task or you'll get the top 10x, where x is the number of reduce tasks.

Upvotes: 1

Related Questions