user1207659
user1207659

Reputation: 33

Min Max Count Using Map Reduce

I have developed a Map reduce application to determine the first and last time a user com‐ mented and the total number of comments from that user based on book written by Donald Miner.

But the problem with my algorithm is the reducer. I have grouped the comments based on user Id. My test data contains two userid each posting 3 comments on different dates. hence a total of 6 rows.

So my reducer output should print two records each showing first and last time a user commented and total comments for each userid.

But, my reducer is printing six records. Can some one point out whats wrong with the following code ?

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.arjun.mapreduce.patterns.mapreducepatterns.MRDPUtils;

import com.sun.el.parser.ParseException;

public class MinMaxCount {

    public static class MinMaxCountMapper extends 
            Mapper<Object, Text, Text, MinMaxCountTuple> {

        private Text outuserId = new Text();
        private MinMaxCountTuple outTuple = new MinMaxCountTuple();

        private final static SimpleDateFormat sdf = 
                     new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSS");

        @Override
        protected void map(Object key, Text value,
                org.apache.hadoop.mapreduce.Mapper.Context context)
                throws IOException, InterruptedException {

            Map<String, String> parsed = 
                     MRDPUtils.transformXMLtoMap(value.toString());

            String date = parsed.get("CreationDate");
            String userId = parsed.get("UserId");

            try {
                Date creationDate = sdf.parse(date);
                outTuple.setMin(creationDate);
                outTuple.setMax(creationDate);
            } catch (java.text.ParseException e) {
                System.err.println("Unable to parse Date in XML");
                System.exit(3);
            }

            outTuple.setCount(1);
            outuserId.set(userId);

            context.write(outuserId, outTuple);

        }

    }

    public static class MinMaxCountReducer extends 
            Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {

        private MinMaxCountTuple result = new MinMaxCountTuple();


        protected void reduce(Text userId, Iterable<MinMaxCountTuple> values,
                org.apache.hadoop.mapreduce.Reducer.Context context)
                throws IOException, InterruptedException {

            result.setMin(null);
            result.setMax(null);
            result.setCount(0);
            int sum = 0;
            int count = 0;
            for(MinMaxCountTuple tuple: values )
            {
                if(result.getMin() == null || 
                        tuple.getMin().compareTo(result.getMin()) < 0) 
                {
                    result.setMin(tuple.getMin());
                }

                if(result.getMax() == null ||
                        tuple.getMax().compareTo(result.getMax()) > 0)  {
                    result.setMax(tuple.getMax());
                }

                System.err.println(count++);

                sum += tuple.getCount();
            }

            result.setCount(sum);
            context.write(userId, result);
        }

    }

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String [] otherArgs = new GenericOptionsParser(conf, args)
                            .getRemainingArgs();
        if(otherArgs.length < 2 )
        {
            System.err.println("Usage MinMaxCout input output");
            System.exit(2);
        }


        Job job = new Job(conf, "Summarization min max count");
        job.setJarByClass(MinMaxCount.class);
        job.setMapperClass(MinMaxCountMapper.class);
        //job.setCombinerClass(MinMaxCountReducer.class);
        job.setReducerClass(MinMaxCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MinMaxCountTuple.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        boolean result = job.waitForCompletion(true);
        if(result)
        {
            System.exit(0);
        }else {
            System.exit(1);
        }

    }

}

Input: 
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-07-30T07:29:33.343" UserId="831878" />
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-08-01T07:29:33.343" UserId="831878" />
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-08-02T07:29:33.343" UserId="831878" />
<row Id="8189678" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-06-30T07:29:33.343" UserId="931878" />
<row Id="8189678" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-07-01T07:29:33.343" UserId="931878" />
<row Id="8189678" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-08-02T07:29:33.343" UserId="931878" />

output file contents part-r-00000:

831878  2011-07-30T07:29:33.343 2011-07-30T07:29:33.343 1
831878  2011-08-01T07:29:33.343 2011-08-01T07:29:33.343 1
831878  2011-08-02T07:29:33.343 2011-08-02T07:29:33.343 1
931878  2011-06-30T07:29:33.343 2011-06-30T07:29:33.343 1
931878  2011-07-01T07:29:33.343 2011-07-01T07:29:33.343 1
931878  2011-08-02T07:29:33.343 2011-08-02T07:29:33.343 1

job submission output:


12/12/16 11:13:52 INFO input.FileInputFormat: Total input paths to process : 1
12/12/16 11:13:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/12/16 11:13:52 WARN snappy.LoadSnappy: Snappy native library not loaded
12/12/16 11:13:52 INFO mapred.JobClient: Running job: job_201212161107_0001
12/12/16 11:13:53 INFO mapred.JobClient:  map 0% reduce 0%
12/12/16 11:14:06 INFO mapred.JobClient:  map 100% reduce 0%
12/12/16 11:14:18 INFO mapred.JobClient:  map 100% reduce 100%
12/12/16 11:14:23 INFO mapred.JobClient: Job complete: job_201212161107_0001
12/12/16 11:14:23 INFO mapred.JobClient: Counters: 26
12/12/16 11:14:23 INFO mapred.JobClient:   Job Counters 
12/12/16 11:14:23 INFO mapred.JobClient:     Launched reduce tasks=1
12/12/16 11:14:23 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=12264
12/12/16 11:14:23 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
12/12/16 11:14:23 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
12/12/16 11:14:23 INFO mapred.JobClient:     Launched map tasks=1
12/12/16 11:14:23 INFO mapred.JobClient:     Data-local map tasks=1
12/12/16 11:14:23 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=10124
12/12/16 11:14:23 INFO mapred.JobClient:   File Output Format Counters 
12/12/16 11:14:23 INFO mapred.JobClient:     Bytes Written=342
12/12/16 11:14:23 INFO mapred.JobClient:   FileSystemCounters
12/12/16 11:14:23 INFO mapred.JobClient:     FILE_BYTES_READ=204
12/12/16 11:14:23 INFO mapred.JobClient:     HDFS_BYTES_READ=888
12/12/16 11:14:23 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=43479
12/12/16 11:14:23 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=342
12/12/16 11:14:23 INFO mapred.JobClient:   File Input Format Counters 
12/12/16 11:14:23 INFO mapred.JobClient:     Bytes Read=761
12/12/16 11:14:23 INFO mapred.JobClient:   Map-Reduce Framework
12/12/16 11:14:23 INFO mapred.JobClient:     Map output materialized bytes=204
12/12/16 11:14:23 INFO mapred.JobClient:     Map input records=6
12/12/16 11:14:23 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/12/16 11:14:23 INFO mapred.JobClient:     Spilled Records=12
12/12/16 11:14:23 INFO mapred.JobClient:     Map output bytes=186
12/12/16 11:14:23 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/12/16 11:14:23 INFO mapred.JobClient:     Combine input records=0
12/12/16 11:14:23 INFO mapred.JobClient:     SPLIT_RAW_BYTES=127
12/12/16 11:14:23 INFO mapred.JobClient:     Reduce input records=6
12/12/16 11:14:23 INFO mapred.JobClient:     Reduce input groups=2
12/12/16 11:14:23 INFO mapred.JobClient:     Combine output records=0
12/12/16 11:14:23 INFO mapred.JobClient:     Reduce output records=6
12/12/16 11:14:23 INFO mapred.JobClient:     Map output records=6

Upvotes: 3

Views: 7525

Answers (1)

Amar
Amar

Reputation: 12010

Ah caught the culprit, just change your reduce method's signature to following:

protected void reduce(Text userId, Iterable<MinMaxCountTuple> values, Context context) throws IOException, InterruptedException {

Basically you just need to have Context and not org.apache.hadoop.mapreduce.Reducer.Context

Now the output looks like :

831878  2011-07-30T07:29:33.343 2011-08-02T07:29:33.343 3
931878  2011-06-30T07:29:33.343 2011-08-02T07:29:33.343 3

I tested it locally for you, and this change did the trick. Though it is an odd behavior and if anyone would shed light on this it would be great. It has something to do with generics though. As when org.apache.hadoop.mapreduce.Reducer.Context is used it says that :

"Reducer.Context is a raw type. References to generic type Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>.Context should be parameterized"

But when only 'Context' is used it's alright.

Upvotes: 4

Related Questions