Reputation: 2480
All right, so I've asked a somewhat similar question related to how Spark handles exceptions internally, but the example I had back then wasn't really clear or complete. An answer there pointed me in some direction but I can't really explain some things.
I've setup a dummy spark streaming app and in the transform stage I have a russian-roulette expression, which might or not throw an exception. If an exception is thrown, I stop the spark streaming context. That's it, no other logic, no RDD
transformation.
object ImmortalStreamingJob extends App {
val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val elems = (1 to 1000).grouped(10)
.map(seq => ssc.sparkContext.parallelize(seq))
.toSeq
val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*))
val transformed = stream.transform { rdd =>
try {
if (Random.nextInt(6) == 5) throw new RuntimeException("boom")
else println("lucky bastard")
rdd
} catch {
case e: Throwable =>
println("stopping streaming context", e)
ssc.stop(stopSparkContext = true, stopGracefully = false)
throw e
}
}
transformed.foreachRDD { rdd =>
println(rdd.collect().mkString(","))
}
ssc.start()
ssc.awaitTermination()
}
Running this in IntelliJ will throw the exception at some point. The fun part:
RDD
has been processed, the app hangs after printing the error message and never stops, which is not what I wantWhy does the app hang instead of dying in the second case?
I'm running Spark 2.1.0 on Scala 2.11.8. Getting out the try-catch solves the problem (Spark stops by itself). Also, moving out the try-catch inside foreachRDD
solves the problem.
However I'm looking for an answer that can help me understand what's going on in this particular example.
Upvotes: 12
Views: 1512
Reputation: 2480
I dug on this issue for the past few weeks so I figured out it would be nice to let you guys know of what I've learned in the past days with the help of both the StackOverflow community and Apache Spark community:
StreamingContext
inside a spark transformation / action like I'm doing in this example. If you want to do that, just do it in a separate thread. Otherwise you might stumble upon deadlocks or some undefined behaviour. Thanks goes to @Shinzong Xhu for that.SparkException
in which your own exception could be wrapped. Your streaming app won't die but at least the streaming tasks will fail. Thank you @Vidya for inspiration on this one.UPDATE [Don't forget to close external resources]
Be sure to check out if you need to stop any other resources un-related to Spark. For example, if you stop the streaming job but the driver uses an ActorSystem
from akka to perform some HTTP calls, don't forget to terminate the system or your app will hang. This might not seem related to Spark, but don't forget about it.
Thanks again for the other answers and I hope you'll find this information helpful.
Upvotes: 0
Reputation: 30310
You will only see exceptions manifest themselves in actions (like foreachRDD
in this case) rather than transformations (like transform
in this case) because actions execute transformations lazily. This means your transformations won't even occur until the action. The reason why this is necessary demands changing your mental model of how distributed processing works.
Consider a conventional, single-threaded program. Code proceeds line-by-line, and if an exception is thrown and not handled, subsequent lines of code just don't execute. In a distributed system where the same Spark transformations are running in parallel on multiple machines (and at different paces), what should happen when an exception is thrown? It's not so simple since the exception on one machine is independent of the code proceeding on other machines, which is how you want it. To want all the independent tasks spread throughout a cluster to just shut down on an exception is simply single-machine thinking that doesn't translate to a distributed paradigm. How is the driver supposed to deal with that?
According to Matei Zaharia, now of Databricks and one of the creators of Spark back at Berkeley, "Exceptions should be sent back to the driver program and logged there (with a SparkException
thrown if a task fails more than 4 times)." (Incidentally, this default number of retries can be changed with spark.task.maxFailures
.). So if Log4J is properly configured on the executors, the exception will be logged there; then it will be serialized and sent back to the driver, who will try again 3 more times by default.
In your particular situation, I would guess you have a couple of things going on. First, you are running on a single machine, which will give a misleading picture of how exception handling works in a distributed model. Second, you are stopping the context prematurely. Stopping the context is an extremely destructive operation, which includes stopping all your listeners and the DAGScheduler
. Frankly, I don't know how you can expect Spark to wrap everything up so neatly when you've basically turned out the lights.
Finally, I would mention that a more elegant exception handling model might be executing your transformations inside a Try
. You will end up with potentially more cumbersome code in that your transformations will return RDD[Try[T]]
or DStream[Try[T]]
, which means you will have to handle the Success
and Failure
cases for each element. But you will be able to propagate success and error information downstream with all the benefits a monad provides including mapping RDD[Try[A]] => RDD[Try[B]]
and even using for
comprehensions (by virtue of flatMap
).
Upvotes: 4
Reputation: 2345
First, the reason it works when the exception handling is moved to the foreachRDD, is because the code in the body of foreachRDD is where the exception shows up. Transformations are evaluated lazily, when an action is called (collect for e.g.). So having caught the error, calling ssc.stop correctly aborts the job, ssc is in scope and everything is nice and deterministic.
Now, when you call ssc.stop in a transformation, things get a little more confusing. The context isn't actually in scope from the body of the transformation, because the transformation is executed by a worker, rather than driver. It just happens to work for you at the moment because you are running all nice and locally. As soon as you move to a distributed environment, you will find your job won't run because you can't distribute the context to the executors.
When an exception is thrown from a transformation, the stack trace is sent to the driver, which will retry the task, but in your case you have stopped the context, so who knows what is going to happen? I'd guess that if the exception happens on the first RDD, somehow the job is killed because it's so soon, whether by fluke or by design. And I'd guess that it doesn't all get killed if the exception isn't in the first RDD transform because you stop the context and somehow create a condition that the driver and/or the task/s wait indefinitely.
Upvotes: 2