Amnesiac
Amnesiac

Reputation: 669

Find the average of numbers using MapReduce

I have been trying to write some code to find the average of numbers using MapReduce.

I am trying to use global counters to reach my goal but I am not able to set the counter value in the map method of my Mapper and I am also not able to retrive the counter value in the reduce method of my Reducer.

Do I have to use a global counter in map anyway (e.g. by using incrCounter(key, amount) of the provided Reporter)? Or would you suggest any different logic to get the average of some numbers?

Upvotes: 12

Views: 42023

Answers (4)

witrin
witrin

Reputation: 3763

The arithmetic mean is an aggregate function which is not distributive but algebraic. According to Han et al. an aggregate function is distributive if:

[...] it can be computed [...] as follows. Suppose [..] data are partitioned into n sets. We apply the function to each partition, resulting in n aggregate values. If the result derived by applying the function to the n aggregate values is the same as that derived by applying the function to the entire data set (without partitioning), the function can be computed in a distributed manner.

Or in other words, it has to be associative and commutative. An aggregate function however is algebraic according to Han et al. if:

[...] it can be computed by an algebraic function with m arguments (where m is a bounded positive integer), each of which is obtained by applying a distributive aggregate function.

For the arithmetic mean this is just avg = sum/count. Obviously you need to carry a count additionally. But using a global counter therefor seems to be a misuse. The API describes org.apache.hadoop.mapreduce.Counter as follows:

A named counter that tracks the progress of a map/reduce job.

Counters should be typically used for statistics about jobs anyway but not as part of calculations during the data processing itself.

So everything you have to do within a partition is to add your numbers up and track their count together with the sum (sum, count); a simple approach could be a string like <sum><separator><count>.

In the mapper the count will be always 1 and the sum is the raw value itself. To reduce the map files already you could use the combiner and process the aggregates like (sum_1 + ... + sum_n, count_1 + ... + count_n). This must be repeated in the reducer and finished by the final calculation sum/count. Keep in mind that this approach is independent from the used key!

Finally here is a simple example using the raw crime statistics of the LAPD which should calculates the "average crime time" in Los Angeles:

public class Driver extends Configured implements Tool {
    enum Counters {
        DISCARDED_ENTRY
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Driver(), args);
    }

    public int run(String[] args) throws Exception {
        Configuration configuration = getConf();

        Job job = Job.getInstance(configuration);
        job.setJarByClass(Driver.class);

        job.setMapperClass(Mapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setCombinerClass(Combiner.class);
        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : -1;
    }
}

public class Mapper extends org.apache.hadoop.mapreduce.Mapper<
    LongWritable,
    Text,
    LongWritable,
    Text
> {

    @Override
    protected void map(
        LongWritable key,
        Text value,
        org.apache.hadoop.mapreduce.Mapper<
            LongWritable,
            Text,
            LongWritable,
            Text
        >.Context context
    ) throws IOException, InterruptedException {
            // parse the CSV line
            ArrayList<String> values = this.parse(value.toString());

            // validate the parsed values
            if (this.isValid(values)) {

                // fetch the third and the fourth column
                String time = values.get(3);
                String year = values.get(2)
                    .substring(values.get(2).length() - 4);

                // convert time to minutes (e.g. 1542 -> 942)
                int minutes = Integer.parseInt(time.substring(0, 2))
                    * 60 + Integer.parseInt(time.substring(2,4));

                // create the aggregate atom (a/n)
                // with a = time in minutes and n = 1
                context.write(
                    new LongWritable(Integer.parseInt(year)),
                    new Text(Integer.toString(minutes) + ":1")
                );
            } else {
                // invalid line format, so we increment a counter
                context.getCounter(Driver.Counters.DISCARDED_ENTRY)
                    .increment(1);
            }
    }

    protected boolean isValid(ArrayList<String> values) {
        return values.size() > 3 
            && values.get(2).length() == 10 
            && values.get(3).length() == 4;
    }

    protected ArrayList<String> parse(String line) {
        ArrayList<String> values = new ArrayList<>();
        String current = "";
        boolean escaping = false;

        for (int i = 0; i < line.length(); i++){
            char c = line.charAt(i);

            if (c == '"') {
                escaping = !escaping;
            } else if (c == ',' && !escaping) {
                values.add(current);
                current = "";
            } else {
                current += c;
            }
        }

        values.add(current);

        return values;
    }
}

public class Combiner extends org.apache.hadoop.mapreduce.Reducer<
    LongWritable,
    Text,
    LongWritable,
    Text
> {

    @Override
    protected void reduce(
        LongWritable key,
        Iterable<Text> values,
        Context context
    ) throws IOException, InterruptedException {
        Long n = 0l;
        Long a = 0l;
        Iterator<Text> iterator = values.iterator();

        // calculate intermediate aggregates
        while (iterator.hasNext()) {
            String[] atom = iterator.next().toString().split(":");
            a += Long.parseLong(atom[0]);
            n += Long.parseLong(atom[1]);
        }

        context.write(key, new Text(Long.toString(a) + ":" + Long.toString(n)));
    }
}

public class Reducer extends org.apache.hadoop.mapreduce.Reducer<
    LongWritable,
    Text,
    LongWritable,
    Text
> {

    @Override
    protected void reduce(
        LongWritable key, 
        Iterable<Text> values, 
        Context context
    ) throws IOException, InterruptedException {
        Long n = 0l;
        Long a = 0l;
        Iterator<Text> iterator = values.iterator();

        // calculate the finale aggregate
        while (iterator.hasNext()) {
            String[] atom = iterator.next().toString().split(":");
            a += Long.parseLong(atom[0]);
            n += Long.parseLong(atom[1]);
        }

        // cut of seconds
        int average = Math.round(a / n);

        // convert the average minutes back to time
        context.write(
            key,
            new Text(
                Integer.toString(average / 60) 
                    + ":" + Integer.toString(average % 60)
            )
        );
    }
}

Upvotes: 1

mkaul
mkaul

Reputation: 130

Average is sum / size. If sum is something like sum = k1 + k2 + k3 + ... , you might divide by size after or during summing up. So the average is also k1 / size + k2 / size + k3 / size + ...

The Java 8 code is simple:

    public double average(List<Valuable> list) {
      final int size = list.size();
      return list
            .stream()
            .mapToDouble(element->element.someValue())
            .reduce(0,(sum,x)->sum+x/size);
    }

So you first map each value of your elements in the list to double and then summing up via the reduce function.

Upvotes: 2

Pankaj Khattar
Pankaj Khattar

Reputation: 111

Use all 3 Mapper/Combiner/Reducer to solve the issue. Refer to below link for complete code & explanation

http://alchemistviews.blogspot.com/2013/08/calculate-average-in-map-reduce-using.html

Upvotes: 2

Sibimon Sasidharan
Sibimon Sasidharan

Reputation: 460

The logic is quite simple: If all the number have the same key, then the mapper sent all the values you want to find the average of with that same key. Because of this, in the reducer you can sum the values in the iterator. You can then keep a counter on number time the iterator works, which solves the issue of how many items are to be averaged. Finally, after the iterator, you can find the average by dividing the sum by the number of items.

Be careful, this logic will not work if the combiner class is set as the same class as reducer...

Upvotes: 11

Related Questions