Reputation: 816
I'm running long jobs (+ 3 hours) on a large Spark cluster on yarn mode. The VMs workers running spark are hosted on the Google Cloud Dataproc and most of them can be destroyed during execution (preemptible VMs that cost less).
When this happens, the job fails because tasks are failing on the destroyed worker with this error in the container logs of the failing worker :
Executor is not registered
I've tried setting spark.task.maxFailures
to 1000 but this doesn't seem to be very effective : even though the job finishes, the tasks doesn't seem to be automatically re-distributed and the computation for the tasks assigned to this specific worker seem to roll back to the initial stage.
Would there be a way of having a more fault tolerant configuration that simply excludes unresponsive executors and re-assigns their tasks?
I could include the ressourcemanager logs, nodemanager and container logs if asked but I don't think it would be relevant.
Upvotes: 3
Views: 463
Reputation: 2158
This appears to be a regression in how pre-emptible workers leave the cluster.
The issue is not just failure intolerance. Preemptible workers are created and destroyed continuously throughout cluster's lifecycle. Each time a worker leaves, YARN will wait for 15m for heartbeat before detecting failure and recreating the container. This can make your job run considerably longer.
We will fix this in the next release.
Workaround:
The following will force the worker to leave cluster on shutdown.
Create the following script and upload it to a GCS bucket:
#!/bin/sh
sudo sed -i "s/.*Stopping google-dataproc-agent.*/start-stop-daemon --stop --signal USR2 --quiet --oknodo --pidfile \${AGENT_PID_FILE}; sleep 5s/" \
/etc/init.d/google-dataproc-agent
Lets say you uploaded it to gs://my-bucket/fix.sh
Now recreate your cluster with this initialization action:
gcloud beta dataproc clusters create my-cluster ... \
--initialization-actions gs://my-bucket/fix.sh
You can verify this by ssh-ing into the master node and setting up a watch on yarn node list:
gcloud compute ssh my-cluster-m
watch yarn node -list
In another terminal, issue a cluster update command that reduces number of workers and verify that number of yarn nodes changes accordingly.
Upvotes: 5