Reputation: 21615
During execution of my Spark program, sometimes (The reason for it is still a mystery to me) yarn kills containers (executors) giving the message that the memory limit was exceeded. My program does recover though with Spark re-executing the task by spawning a new container. However, in my program, a task also creates some intermediate files on the disk. When a container is killed, those files are left behind. Is there a way I can catch the executor-killed as an exception so that I can delete the intermediate files left behind. Obviously, the exception handling code also needs to run on the same node the executor was running on, so that I can delete the files from there.
Upvotes: 2
Views: 3569
Reputation: 29165
Adding on top of @Taras Matyashovskyy answer.
You can Use SparkListener and intercept SparkListener (Executor) events.
Below are list of Listener events available.
SparkListenerApplicationStart
SparkListenerJobStart
SparkListenerStageSubmitted
SparkListenerTaskStart
SparkListenerTaskGettingResult
SparkListenerTaskEnd
SparkListenerStageCompleted
SparkListenerJobEnd
SparkListenerApplicationEnd
SparkListenerEnvironmentUpdate
SparkListenerBlockManagerAdded
SparkListenerBlockManagerRemoved
SparkListenerBlockUpdated
SparkListenerUnpersistRDD
SparkListenerExecutorAdded
SparkListenerExecutorRemoved
Example : HeartBeatReceiver.scala
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
extends SparkListener with ThreadSafeRpcEndpoint with Logging {
def this(sc: SparkContext) {
this(sc, new SystemClock)
}
sc.addSparkListener(this) ...
Please have a look in to removed reason which may suits you(I haven't tried)
Upvotes: 4
Reputation: 426
As an option, you can try to use SparkListener
functionality.
So you can create your own class and implement SparkListener
interface to hook into available events that are quite self-explanatory.
Then you need to add that custom listener to SparkContext
.
2 options are available:
SparkContext.addSparkListener(<your custom listener>)
spark.extraListeners
property, more info here http://spark.apache.org/docs/latest/configuration.html#available-propertiesUpvotes: 5