usdq777
usdq777

Reputation: 207

Map Reduce - How to group and aggregate multiple attributes in a single job

I am currently struggling a bit with MapReduce. I have the following dataset:

1,John,Computer
2,Anne,Computer
3,John,Mobile
4,Julia,Mobile
5,Jack,Mobile
6,Jack,TV
7,John,Computer
8,Jack,TV
9,Jack,TV
10,Anne,Mobile
11,Anne,Computer
12,Julia,Mobile

Now I want to apply MapReduce with grouping and aggregation on this data set, in order that the output doesn't only show how many times which person bought something, but also what the product is, which the person ordered most.

So the output should look like:

John 3 Computer
Anne 3 Mobile
Jack 4 TV
Julia 2 Mobile

My current implementation of the mapper as well as reducer looks like that, which perfectly returns how many orders were made by the individuals, however, I am really clueless how to get the desired output.

static class CountMatchesMapper extends Mapper<Object,Text,Text,IntWritable> {
    @Override
    protected void map(Object key, Text value, Context ctx) throws IOException, InterruptedException {
        String row = value.toString();
        String[] row_part = row.split(",");


            try{
                ctx.write(new Text(row_part[1]), new IntWritable(1));

            catch (IOException e) {
            }
            catch (InterruptedException e) {
            }

        }

    }
}


static class CountMatchesReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException {
        int i = 0;
        for (IntWritable value : values) i += value.get();
        try{
            ctx.write(key, new IntWritable(i));
        }
        catch (IOException e) {
        }
        catch (InterruptedException e) {
        }
    }
}

I would really appreciate any efficient solution and help.

Thanks in advance!

Upvotes: 0

Views: 4155

Answers (1)

PetrosP
PetrosP

Reputation: 665

If I understand correctly what you want, I think the 2nd output line should be:

Anne 3 Computer

based on the input. Anne has bought 3 products in total: 2 Computers and 1 Mobile.

I have here a very basic and simplistic approach, which doesn't take into account edge cases etc, but could give you some direction:

    static class CountMatchesMapper extends Mapper<LongWritable, Text, Text, Text> {
    private Text outputKey = new Text();
    private Text outputValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
        String row = value.toString();
        String[] row_part = row.split(",");
        outputKey.set(row_part[1]);
        outputValue.set(row_part[2]);
        ctx.write(outputKey, outputValue);
    }
}

static class CountMatchesReducer extends Reducer<Text, Text, Text, NullWritable> {
    private Text output = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context ctx) throws IOException, InterruptedException {
        HashMap<String, Integer> productCounts = new HashMap();

        int totalProductsBought = 0;
        for (Text value : values) {
            String productBought = value.toString();
            int count = 0;
            if (productCounts.containsKey(productBought)) {
                count = productCounts.get(productBought);
            }
            productCounts.put(productBought, count + 1);
            totalProductsBought += 1;
        }

        String topProduct = getTopProductForPerson(productCounts);
        output.set(key.toString() + " " + totalProductsBought + " " + topProduct);
        ctx.write(output, NullWritable.get());
    }

    private String getTopProductForPerson(Map<String, Integer> productCounts) {
        String topProduct = "";
        int maxCount = 0;
        for (Map.Entry<String, Integer> productCount : productCounts.entrySet()) {
            if (productCount.getValue() > maxCount) {
                maxCount = productCount.getValue();
                topProduct = productCount.getKey();
            }
        }
        return topProduct;
    }
}

The above will give the output that you described.

If you want a proper solution that scales etc then probably you need a composite key and custom GroupComparator. This way you will be able to add Combiner as well and make it much more efficient. However, the approach above should work for an average case.

Upvotes: 1

Related Questions