cloud_anny
cloud_anny

Reputation: 37

Where the Map method get called?

I am looking for internal working of map method in hadoop.Where the map method get called?is it a run method which called map method?

Upvotes: 1

Views: 1013

Answers (2)

Ravindra babu
Ravindra babu

Reputation: 38910

I am quoting example code from Apache documentation page to further answer your queries.

The Driver class, which has main method for word count example is defined as follows.

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

Now from grepcode website for Job class,back track what is happening when you waitForCompletion method in Job class.

/**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      jobClient.monitorAndPrintJob(conf, info);
    } else {
      info.waitForCompletion();
    }
    return isSuccessful();
  }

}

Now check submit() method code in Job class.

/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() throws IOException, InterruptedException, 
                              ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();

    // Connect to the JobTracker and submit the job
    connect();
    info = jobClient.submitJobInternal(conf);
    super.setJobID(info.getID());
    state = JobState.RUNNING;
   }

Now from grepcode site for JobClient class :

check source code of

public

RunningJob submitJobInternal(final JobConf job
                               ) throws FileNotFoundException, 
                                        ClassNotFoundException,
                                        InterruptedException,
                                        IOException 

Refer to below post for internals along with grepcode.

What is the difference between JobClient.java and JobSubmitter.java in hadoop2?

Upvotes: 1

arcee123
arcee123

Reputation: 211

This is a basic example of a mapreduce script writting in Java. you can also use mapreduce-streaming for other languages such as Python and C++, but Java is the home language.

The Functions Map and Reduce are called from the Main Class when the environmentals such as input filename, output filename, and runtime parameters are established:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();

     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
       String line = value.toString();
       StringTokenizer tokenizer = new StringTokenizer(line);
       while (tokenizer.hasMoreTokens()) {
         word.set(tokenizer.nextToken());
         output.collect(word, one);
       }
     }
   }

   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
       int sum = 0;
       while (values.hasNext()) {
         sum += values.next().get();
       }
       output.collect(key, new IntWritable(sum));
     }
   }

   public static void main(String[] args) throws Exception {
     JobConf conf = new JobConf(WordCount.class);
     conf.setJobName("wordcount");

     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);

     conf.setMapperClass(Map.class);
     conf.setCombinerClass(Reduce.class);
     conf.setReducerClass(Reduce.class);

     conf.setInputFormat(TextInputFormat.class);
     conf.setOutputFormat(TextOutputFormat.class);

     FileInputFormat.setInputPaths(conf, new Path(args[0]));
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));

     JobClient.runJob(conf);
   }
}

You can see this fully described in the Apache tutorial here: https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Example%3A+WordCount+v1.0

In this example, The Map function organizes the values and assigns them as key-value pairs, such as <word, 1> and sorts the pairs to hand off to the reduce function. The reduce function performs the aggregation.

This is the start of a very long practice, but it spawns off the main concept of the map creates the key-value pairs needed for aggregation, and the reduce aggregates and response back. Both are done in the data nodes, which is what gives distributed replication it's advantage in processing speed.

Hope this helps.

Upvotes: -1

Related Questions