Reputation: 125
I want to submit the job in a cluster environment with a timeout parameter, is there a way to make spark kill a running job if it exeeded the allowed duration?
Upvotes: 7
Views: 3485
Reputation: 11
This works if you're on a version that supports listeners:
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.JobExecutionStatus
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.annotation.tailrec
val sparkJobDurationListener = new SparkListener() {
import scala.concurrent.ExecutionContext.Implicits.global
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
@tailrec
def checkJob(jobId: Int
, maxDuration: Duration = 2.hours
, currentDuration: Duration = 0.seconds
, heartbeat: Duration = 10.seconds): Unit = {
val statusTracker = spark.sparkContext.statusTracker
val isRunning = statusTracker.getJobInfo(jobId) match {
case Some(js) => js.status() == JobExecutionStatus.RUNNING
case None => false
}
if (isRunning && currentDuration < maxDuration) {
Thread.sleep(heartbeat.toMillis)
checkJob(jobId, maxDuration, currentDuration + heartbeat, heartbeat)
}
else if (isRunning) {
spark.sparkContext.cancelJob(jobId, s"Job duration $currentDuration exceeded $maxDuration")
}
}
Future(checkJob(jobStart.jobId))
}
}
spark.sparkContext.addSparkListener(sparkJobDurationListener)
Upvotes: 0
Reputation: 1131
You can use YARN REST api to kill the spark application from your service. I am using the following code to stop the long running spark application. The following code is using httpclient library.
def killApplication(applicationId: String) : Boolean = {
val appKillPut = new HttpPut(s"http://xx.xx.xx.xx:8088//ws/v1/cluster/apps/$applicationId/state")
val json = new JSONObject(Map("state"-> "KILLED"))
val params = new StringEntity(json.toString(),"UTF-8")
params.setContentType("application/json")
appKillPut.addHeader("Content-Type", "application/json")
appKillPut.addHeader("Accept", "*/*")
appKillPut.setEntity(params)
println(s"Request payload ${json.toString}")
val client: CloseableHttpClient = HttpClientBuilder.create().build()
val response: CloseableHttpResponse = client.execute(appKillPut)
val responseBody = EntityUtils.toString(response.getEntity)
println(s"Response payload ${responseBody}")
val statusCode: Int = response.getStatusLine.getStatusCode
if(statusCode == 200 || statusCode == 201 || statusCode == 202) {
println(s"Successfully stopped the application : ${applicationId}")
true
} else {
false
}
}
Hope this helps.
Ravi
Upvotes: 0
Reputation: 22711
At Spark 2.1.0, there is no built-in solution (a very good feature to add!).
You can play with speculation
feature to re-launch long task and spark.task.maxFailures
to kill too many re-launched tasks.
But this is absolutely not clean, Spark is missing a real "circuit breaker" to stop long task (such as the noob SELECT * FROM DB
)
In other side, you could use the Spark web UI web API:
1) Get running jobs: GET http://SPARK_CLUSTER_PROD/api/v1/applications/application_1502112083252_1942/jobs?status=running
(this will give you an array with submissionTime
field that you can use to find long jobs)
2) Kill the job: POST http://SPARK_CLUSTER_PROD/stages/stage/kill/?id=23881&terminate=true
for each job stages.
I believe Spark has a hidden API too, you can try to use.
Upvotes: 1