VS_FF
VS_FF

Reputation: 2373

How exactly does Spark reshuffle RDD among Executors/Tasks in case of exec. failure or in dynamic shuffle?

I am considering adding a set of preemptable instances to the Worker pool of a Spark job that I run on Google Could Dataproc, but I am trying to understand what exactly would happen in case some of these instances get killed. I have witnessed what happens when Spark does some sort of a native operation, like with SparkSQL, etc. and it seems like it manages to preserve the 'resilience' element of the RDD.

But how exactly does it handle the RDD redistribution for RDDs that are processed by custom-written functions like .forEach() and .forEachPartition()? If a worker where a task like this is being processed is killed, what happens exactly?

Specifically, imagine the .forEachPartition() Java code that looks something like this:

public void test(JavaRDD<String> RDD)
{
    RDD.foreachPartition(new VoidFunction<Iterator<String>>(){
        private static final long serialVersionUID = 1L;

        @Override
        public void call(Iterator<String> t) throws Exception
        {
            Queue<String> elementQ = new LinkedList<>();
            while (t.hasNext())
                elementQ.offer(t.next());

            while(elementQ.size() >0)
            {
                String curElement = elementQ.remove();
                System.out.println("Doing something with element " + curElement);
                boolean condition = false;
                if(condition)
                    elementQ.offer(curElement);
            }
        }});
}

When a task instance starts up initially, it takes all elements of the RDD that were assigned to it and puts them into a Queue. It then keeps traversing through this queue -- either removing the elements that have been processed or putting them back if they have to wait to be processed later, based on some internal logic.

If the worker that runs one of these tasks gets killed, what exactly happens? Would all RDD elements that were initially assigned to it be redistributed among other tasks on other workers? Or is there away to mark programmatically which elements have been 'processed' and which elements are pending?

I also noticed that if dynamic allocation is enabled using yarn shuffle service, at some point Spark starts to think that some tasks are taking too long to complete, and it attempts to redistribute the RDD among other tasks. This might potentially be very useful, but again, how exactly does this redistribution happen, and is there a way to control it programmatically inside the .forEachPartition function call?

Upvotes: 1

Views: 309

Answers (1)

Malina
Malina

Reputation: 98

Dynamic resource allocation in Spark does not redistribute the RDD among other tasks - it scales the number of executors to the workload.

Regarding using preemptible instances, in the case that an instance is preempted, the work on the instance is lost and reassigned to other instances, which would set back job progress.

Upvotes: 1

Related Questions