Bruckwald
Bruckwald

Reputation: 797

Hadoop: how reducer nodes are selected?

I just started learning Hadoop, but don't understand how a datanode becomes a reducer node.

But how the jobtracker decides which node becomes a reducer node? I'm reading the Hadoop Definitive guide but this step is not mentioned in the book.

Thanks, Bruckwald

Upvotes: 3

Views: 746

Answers (1)

Thomas Jungblut
Thomas Jungblut

Reputation: 20969

Pretty much first-come, first-serve. Tasks are assigned by heartbeats, so if a Tasktracker pings the Jobtracker that it is alive, it will get a response that might contain a new task to run:

List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
   tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
if (tasks != null) {
   for (Task task : tasks) {
     expireLaunchingTasks.addNewTask(task.getTaskID());
     LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
     actions.add(new LaunchTaskAction(task));
   }
}

Here's the relevant source code of the Jobtracker. So besides which tasktracker comes first, the taskscheduler will check for resource conditions (e.g. if there is a free slot, or a single node is not overloaded).

The relevant code can be found here (which isn't particular exciting):

//
// Same thing, but for reduce tasks
// However we _never_ assign more than 1 reduce task per heartbeat
//
final int trackerCurrentReduceCapacity = 
  Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), 
           trackerReduceCapacity);
final int availableReduceSlots = 
  Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
boolean exceededReducePadding = false;
if (availableReduceSlots > 0) {
  exceededReducePadding = exceededPadding(false, clusterStatus, 
                                          trackerReduceCapacity);
  synchronized (jobQueue) {
    for (JobInProgress job : jobQueue) {
      if (job.getStatus().getRunState() != JobStatus.RUNNING ||
          job.numReduceTasks == 0) {
        continue;
      }

      Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());                             
      if (t != null) {
        assignedTasks.add(t);
        break;
      }

      // Don't assign reduce tasks to the hilt!
      // Leave some free slots in the cluster for future task-failures,
      // speculative tasks etc. beyond the highest priority job
      if (exceededReducePadding) {
        break;
      }
    }
  }

Basically, the first tasktracker that heartbeats to the Jobtracker and has enough slots available will get a reduce tasks.

Upvotes: 6

Related Questions