Gem741
Gem741

Reputation: 125

Limit apache spark job running duration

I want to submit the job in a cluster environment with a timeout parameter, is there a way to make spark kill a running job if it exeeded the allowed duration?

Upvotes: 7

Views: 3485

Answers (3)

Costas Piliotis
Costas Piliotis

Reputation: 11

This works if you're on a version that supports listeners:

import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.JobExecutionStatus
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.annotation.tailrec
  
val sparkJobDurationListener = new SparkListener() {
  import scala.concurrent.ExecutionContext.Implicits.global

  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    @tailrec
    def checkJob(jobId: Int
                 , maxDuration: Duration = 2.hours
                 , currentDuration: Duration = 0.seconds
                 , heartbeat: Duration = 10.seconds): Unit = {
      val statusTracker = spark.sparkContext.statusTracker
      val isRunning = statusTracker.getJobInfo(jobId) match {
        case Some(js) => js.status() == JobExecutionStatus.RUNNING
        case None => false
      }

      if (isRunning && currentDuration < maxDuration) {
        Thread.sleep(heartbeat.toMillis)
        checkJob(jobId, maxDuration, currentDuration + heartbeat, heartbeat)
      }
      else if (isRunning) {
        spark.sparkContext.cancelJob(jobId, s"Job duration $currentDuration exceeded $maxDuration")
      }
    }
    Future(checkJob(jobStart.jobId))
  }

}
spark.sparkContext.addSparkListener(sparkJobDurationListener)

Upvotes: 0

Ravikumar
Ravikumar

Reputation: 1131

You can use YARN REST api to kill the spark application from your service. I am using the following code to stop the long running spark application. The following code is using httpclient library.

 def killApplication(applicationId: String) : Boolean = {
val appKillPut = new HttpPut(s"http://xx.xx.xx.xx:8088//ws/v1/cluster/apps/$applicationId/state")
val json = new JSONObject(Map("state"-> "KILLED"))

val params = new StringEntity(json.toString(),"UTF-8")
params.setContentType("application/json")

appKillPut.addHeader("Content-Type", "application/json")
appKillPut.addHeader("Accept", "*/*")
appKillPut.setEntity(params)

println(s"Request payload ${json.toString}")

val client: CloseableHttpClient = HttpClientBuilder.create().build()
val response: CloseableHttpResponse = client.execute(appKillPut)
val responseBody = EntityUtils.toString(response.getEntity)
println(s"Response payload ${responseBody}")
val statusCode: Int = response.getStatusLine.getStatusCode
if(statusCode == 200 || statusCode == 201 || statusCode == 202) {
  println(s"Successfully stopped the application : ${applicationId}")
  true
} else {
  false
}
}   

Hope this helps.

Ravi

Upvotes: 0

Thomas Decaux
Thomas Decaux

Reputation: 22711

At Spark 2.1.0, there is no built-in solution (a very good feature to add!).

You can play with speculation feature to re-launch long task and spark.task.maxFailures to kill too many re-launched tasks.

But this is absolutely not clean, Spark is missing a real "circuit breaker" to stop long task (such as the noob SELECT * FROM DB)

In other side, you could use the Spark web UI web API:

1) Get running jobs: GET http://SPARK_CLUSTER_PROD/api/v1/applications/application_1502112083252_1942/jobs?status=running

(this will give you an array with submissionTime field that you can use to find long jobs)

2) Kill the job: POST http://SPARK_CLUSTER_PROD/stages/stage/kill/?id=23881&terminate=true for each job stages.

I believe Spark has a hidden API too, you can try to use.

Upvotes: 1

Related Questions