Argho Chatterjee
Argho Chatterjee

Reputation: 599

How does Hadoop Reducer Work?

I was facing a issue with respect to how hadoop invokes its reducer method. Does it create a new instance of the reducer class for each and every value passed to the reducer or does it just creates a single instance of the reducer class in a particular data node and invokes reducer method by passing different values to the same reducer instance.

I understand multiple reducers means multiple instances in different nodes. But my question is on a particular node , does it creates multiple instance of the reducer class in its invocation.

I am using hadoop 1.x at the moment.

Would be great if ome documentation links are also provided.

Appending Reducer Call in the MRUnit test framework from class "MapReduceDriver.class" where it invokes differed reducer instance for different key value pair. Eg: k1v1, k2v2 is run on two separate instances of the reducer class

@Override
public List<Pair<K3, V3>> run() throws IOException {
if (inputList.isEmpty()) {
  throw new IllegalStateException("No input was provided");
}
if (myMapper == null) {
  throw new IllegalStateException("No Mapper class was provided");
}
if (myReducer == null) {
  throw new IllegalStateException("No Reducer class was provided");
}

List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();

// run map component
for (final Pair<K1, V1> input : inputList) {
  LOG.debug("Mapping input " + input.toString() + ")");

  mapOutputs.addAll(MapDriver.newMapDriver(myMapper).withInput(input)
      .withCounters(getCounters()).withConfiguration(configuration).run());
}

if (myCombiner != null) {
  // User has specified a combiner. Run this and replace the mapper outputs
  // with the result of the combiner.
  LOG.debug("Starting combine phase with combiner: " + myCombiner);
  mapOutputs = new ReducePhaseRunner<K2, V2>().runReduce(
      shuffle(mapOutputs), myCombiner);
}

// Run the reduce phase.
LOG.debug("Starting reduce phase with reducer: " + myReducer);
return new ReducePhaseRunner<K3, V3>().runReduce(shuffle(mapOutputs),
    myReducer);

}

Thanks Argho.

Upvotes: 0

Views: 595

Answers (1)

suresiva
suresiva

Reputation: 3173

No, On a particular node, for a Reduce container, only one reducer thread will be created.

The ReduceTask class is responsible to dispatch the configured reducer class's thread on the node's reduce container. Where it just creates only one instance of the configured Reducer class using reflection and repeatedly invokes the reducer class reduce() method for all the key/values that should be processed by the reducer.

Please refer the below grepcode url to see more details,

http://grepcode.com/file/repo1.maven.org/maven2/com.ning/metrics.action/0.2.0/org/apache/hadoop/mapred/ReduceTask.java#ReduceTask.runOldReducer%28org.apache.hadoop.mapred.JobConf%2Corg.apache.hadoop.mapred.TaskUmbilicalProtocol%2Corg.apache.hadoop.mapred.Task.TaskReporter%2Corg.apache.hadoop.mapred.RawKeyValueIterator%2Corg.apache.hadoop.mapred.RawComparator%2Cjava.lang.Class%2Cjava.lang.Class%29

The MapReduceDriver approach is used for unit testing, where the reducer class is run by separate ReduceDriver class with the base instance of myReducer. For the output list of key/value pair generated by the shuffle method, one ReduceDriver instance is created for each pair in the suffle() returned list. This is to simulate the partitioner and multiple reducer scenario. You refer the source of MapReduceDriver.runReduce() method and the ReduceDriver class for more understanding.

Anyhow with the actual mapreduce flow one one reducer thread will be instantiated for the single container on a node as described above.

Upvotes: 1

Related Questions