pashupati
pashupati

Reputation: 107

Running a Java-based Spark Job on spark-jobserver

I need to run an aggregation Spark job using spark-jobserver using low-latency contexts. I have this Scala runner to run a job on using a Java method from a Java class.

object AggregationRunner extends SparkJob {
  def main(args: Array[String]) {
    val ctx = new SparkContext("local[4]", "spark-jobs")
    val config = ConfigFactory.parseString("")
    val results = runJob(ctx, config)
  }

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
    SparkJobValid;
  }

  override def runJob(sc: SparkContext, config: Config): Any = {
    val context = new JavaSparkContext(sc)
    val aggJob = new ServerAggregationJob()
    val id = config.getString("input.string").split(" ")(0)
    val field = config.getString("input.string").split(" ")(1)
    return aggJob.aggregate(context, id, field)
  }
}

However, I get the following error. I tried taking out the content returned in the Java method and am now just returning a test string, but it still doesn't work:

{
  "status": "ERROR",
  "result": {
    "message": "Ask timed out on [Actor[akka://JobServer/user/context-supervisor/single-context#1243999360]] after [10000 ms]",
    "errorClass": "akka.pattern.AskTimeoutException",
    "stack": ["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)", "akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)", "scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)", "scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)", "akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)", "akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)", "akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)", "akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)", "java.lang.Thread.run(Thread.java:745)"]
  }
}

I am not too sure why there is a timeout since I am only returning a string.

EDIT

So I figured out that the issue was occurring because I was using a Spark context that was created before updating a JAR. However, now that I try to use JavaSparkContext inside the Spark job, it returns to the error shown above.

What would be a permanent way to get rid of the error.

Also, would the fact that I am running a heavy Spark job on a local docker container be a plausible reason for the timeout.

Upvotes: 1

Views: 630

Answers (1)

SanthoshPrasad
SanthoshPrasad

Reputation: 1175

For resolving ask time out issue, please add/change below properties in jobserver configuration file.

spray.can.server {
idle-timeout = 210 s
request-timeout = 200 s
}

for more information take a look at this https://github.com/spark-jobserver/spark-jobserver/blob/d1843cbca8e0d07f238cc664709e73bbeea05f2c/doc/troubleshooting.md

Upvotes: 2

Related Questions