Reputation: 27375
I'm using YARN
and tried to understand how Spark sent job to a cluster with YARN. So I dug into sources and found that when we submit a job (e.g foreach
) the following method is being executed in SparkContext::runJob
:
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
then the JobWaiter
is being created at
DAGScheduler::submitJob
and the JobSubmitted
event is being published. The event is being handled in
DAGSchedulerEventProcessLoop::handleJobSubmitted
which publish another event (SparkListenerJobStart
) to a listener bus. And then invoke
DAGScheduler::submitStage
So it seems in the method there should be the logic where the stages is being submitted to a cluster. But the only I saw there is this:
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
Actually, I expected some NIO-like code somewhere there.
QUESTION: How does Driver program communicates with the cluster if we use YARN
? Where is the piece of code? Could someone help me?
Upvotes: 2
Views: 177
Reputation: 192
As you know Spark can run on multiple cluster managers. Spark achieves this by using an abstraction called SchedulerBackend.
For YARN, there are two implementations :
Here is the source code: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
Upvotes: 1