St.Antario
St.Antario

Reputation: 27375

How does spark send jobs to a cluster?

I'm using YARN and tried to understand how Spark sent job to a cluster with YARN. So I dug into sources and found that when we submit a job (e.g foreach) the following method is being executed in SparkContext::runJob:

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

then the JobWaiter is being created at

DAGScheduler::submitJob 

and the JobSubmitted event is being published. The event is being handled in

DAGSchedulerEventProcessLoop::handleJobSubmitted 

which publish another event (SparkListenerJobStart) to a listener bus. And then invoke

DAGScheduler::submitStage

So it seems in the method there should be the logic where the stages is being submitted to a cluster. But the only I saw there is this:

  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

Actually, I expected some NIO-like code somewhere there.

QUESTION: How does Driver program communicates with the cluster if we use YARN? Where is the piece of code? Could someone help me?

Upvotes: 2

Views: 177

Answers (1)

Nilanjan Sarkar
Nilanjan Sarkar

Reputation: 192

As you know Spark can run on multiple cluster managers. Spark achieves this by using an abstraction called SchedulerBackend.

For YARN, there are two implementations :

  1. YarnClientSchedulerBackend ( for client deploy mode )
  2. YarnSchedulerBackend ( for cluster deploy mode )

Here is the source code: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Upvotes: 1

Related Questions