Vijay Innamuri
Vijay Innamuri

Reputation: 4372

How to get spark job status from program?

I am aware that hadoop REST API provides access to job status via program.

Similarly is there any way to get the spark job status in a program?

Upvotes: 2

Views: 12641

Answers (5)

Volodymyr Zubariev
Volodymyr Zubariev

Reputation: 443

Providing the answer for Java. In Scala would be almost similar just using SparkContext instead of JavaSparkContext.

Assume you have a JavaSparkContext:

private final JavaSparkContext sc;

Following code allow to get all info available from Jobs and Stages tabs:

JavaSparkStatusTracker statusTracker = sc.statusTracker();
for(int jobId: statusTracker.getActiveJobIds()) {
     SparkJobInfo jobInfo = statusTracker.getJobInfo(jobId);
     log.info("Job " + jobId + " status is " + jobInfo.status().name());
     log.info("Stages status:");

     for(int stageId: jobInfo.stageIds()) {
         SparkStageInfo stageInfo = statusTracker.getStageInfo(stageId);

         log.info("Stage id=" + stageId + "; name = " + stageInfo.name()
                    + "; completed tasks:" + stageInfo.numCompletedTasks()
                    + "; active tasks: " + stageInfo.numActiveTasks()
                    + "; all tasks: " + stageInfo.numTasks()
                    + "; submission time: " + stageInfo.submissionTime());
    }
}

Unfortunately everything else is accessible only from scala Spark Context, so could be some difficulties to work with provided structures from Java.

Pools list: sc.sc().getAllPools()

Executor Memory Status: sc.sc().getExecutorMemoryStatus()

Executor ids: sc.sc().getExecutorIds()

Storage info: sc.sc().getRddStorageInfo()

... you can try to find there more useful info.

Upvotes: 3

Elkhan Dadashov
Elkhan Dadashov

Reputation: 2817

You can get Spark job status without using Spark Job History server too. You can use SparkLauncher 2.0.1 (even Spark 1.6 version will work too) for launching your Spark job from Java program:

SparkAppHandle appHandle = sparkLauncher.startApplication();

You can also add listener to startApplication() method:

SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);

Where listener has 2 methods which will inform you about job state change and info change.

I implemented using CountDownLatch, and it works as expected. This is for SparkLauncher version 2.0.1 and it works in Yarn-cluster mode too.

   ...
final CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
Thread sparkAppListenerThread = new Thread(sparkAppListener);
sparkAppListenerThread.start();
long timeout = 120;
countDownLatch.await(timeout, TimeUnit.SECONDS);    
    ...

private static class SparkAppListener implements SparkAppHandle.Listener, Runnable {
    private static final Log log = LogFactory.getLog(SparkAppListener.class);
    private final CountDownLatch countDownLatch;
    public SparkAppListener(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void stateChanged(SparkAppHandle handle) {
        String sparkAppId = handle.getAppId();
        State appState = handle.getState();
        if (sparkAppId != null) {
            log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - "
                    + SPARK_STATE_MSG.get(appState));
        } else {
            log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState));
        }
        if (appState != null && appState.isFinal()) {
            countDownLatch.countDown();
        }
    }
    @Override
    public void infoChanged(SparkAppHandle handle) {}
    @Override
    public void run() {}
}

Upvotes: 1

jozh
jozh

Reputation: 2692

There's a (n)(almost) undocumented REST API feature that delivers almost everything you can see on the Spark UI:

http://<sparkMasterHost>:<uiPort>/api/v1/...

For local installation you can start from here:

http://localhost:8080/api/v1/applications

Possible end points you can find here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala

Upvotes: 2

Daniel Darabos
Daniel Darabos

Reputation: 27455

It is not similar to a REST API, but you can track the status of jobs from inside the application by registering a SparkListener with SparkContext.addSparkListener. It goes something like this:

sc.addSparkListener(new SparkListener {
  override def onStageCompleted(event: SparkListenerStageCompleted) = {
    if (event.stageInfo.stageId == myStage) {
      println(s"Stage $myStage is done.")
    }
  }
})

Upvotes: 5

maasg
maasg

Reputation: 37435

There's a (n)(almost) undocumented REST API feature on the Spark UI that delivers metrics about the job and performance.

You can access it with:

http://<driverHost>:<uiPort>/metrics/json/

(UIPort is 4040 by default)

Upvotes: 1

Related Questions