Raghu
Raghu

Reputation: 11

Hadoop - Finding total number of IP hits and unique IP address and later finding the averge (total IP hits/uniqueIPs)

I am trying to learn Hadoop. I have written a map reduce code for finding total number of IP hits and finding unique IP addresses and later find the average (total IP hits/unique ID).

However I get an output of all IPs along with number of hits. But I cannot get the average for the same.

Code:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public final class IPAddress {
    private final static IntWritable ONE = new IntWritable(1);

    static int totalHits = 0, uniqueIP = 0;
    public final static void main(final String[] args) throws Exception 
    {
        final Configuration conf = new Configuration();

        final Job job = new Job(conf, "IPAddress");
        job.setJarByClass(IPAddress.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(IPMap.class);
        job.setCombinerClass(IPReduce.class);
        job.setReducerClass(IPReduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
        int average = totalHits/uniqueIP;
        System.out.print("Average is :"+average+"\n");
    }

    public static final class IPMap extends Mapper<LongWritable, Text, Text, IntWritable> 
    {
        private final Text mapKey = new Text();

        public final void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException 
        {
            final String line = value.toString();
            final String[] data = line.trim().split("- -");
            if (data.length > 1) 
            {
                final String ipAddress = data[0];
                mapKey.set(ipAddress);
                context.write(mapKey, ONE);
            }
        }
    }

    public static final class IPReduce extends Reducer<Text, IntWritable, Text, IntWritable> 
    {

        public final void reduce(final Text key, final Iterable<IntWritable> values, final Context context) throws IOException, InterruptedException 
        {
            int count = 0, sum = 0, distinctIpCount=0;
            for (final IntWritable val : values) 
            {
                count += val.get();
                sum += count;
                distinctIpCount++;
            }
            totalHits = count;
            uniqueIP = distinctIpCount;
            context.write(key, new IntWritable(count));


        }
    }
}

Upvotes: 1

Views: 3026

Answers (1)

Jeremy Beard
Jeremy Beard

Reputation: 2725

An important point about the execution of MapReduce jobs is that even though you provide all of your code in one class, the MapReduce framework pulls out the mapper and reducer classes you provide and sends them to the worker nodes to execute, while the main() method runs on the local JVM that you start your job with. This means that the mapper and reducer class methods have no visibility of any variables you have defined outside of the mapper and reducer classes.

Specifically to your use case, if you want to work out the average hits for all IP addresses, you can use only one reducer when you invoke the job (-D mapred.reduce.tasks=1), so that you can define totalHits and uniqueIP in IPReducer and all reduce() calls will see the same instance of those variables. You can then calculate your average in the reducer's cleanup() method that runs when all reduce()s have finished.

You won't be able to easily send this back to the main program to print to the screen, but you can either output the result as your job output (the same Context object is provided), or alternatively if you want to leave the per-IP count as the main job output, write the average to an HDFS file using the HDFS API.

Upvotes: 1

Related Questions