ksceriath
ksceriath

Reputation: 195

When is the data in a spark partition actually realised?

I am analysing the performance of my spark application in case of small datasets. I have a lineage graph which looks something like following:

someList.toDS()
.repartition(x)
.mapPartitions(func1)
.mapPartitions(func2)
.mapPartitions(func3)
.filter(cond1)
.count()

I have a cluster of 2 nodes with 8 cores on each. Executors are configured to use 4 cores. So, when the application is running four executors come up using 4 cores each.

I am observing at least (and usually only) 1 task on each thread (i.e. 16 tasks in total) takes a lot longer than other tasks. For example, in one run these tasks are taking approx 15-20 seconds, compared to other tasks running in a second or less.

On profiling the code, I found the bottleneck to be in func3 above:

def func3 = (partition: Iterator[DomainObject]) => {
  val l = partition.toList          // This takes almost all of the time
  val t = doSomething(l)
}

The conversion from an Iterator to a List takes up almost all of the time.

The partition size is very small (even less than 50 in some cases). Even then, the size of partition is almost consistent across different partitions, but only one task per thread takes up the time.

I would have assumed that by the time func3 runs on the executor for a task, the data within that partition would already be present on the executor. Is this not the case? (Does it iterate over the entire dataset to filter out data for this partition somehow, during the execution of func3?!)

Else, why should the conversion from an Iterator over less than fifty objects to a List take up that much time?

Other thing I note (not sure if that is relevant) is the GC time (as per spark ui) for these tasks is also unusually consistent 2s for all of these sixteen tasks, as compared to other tasks (even then, 2s<<20s)

Update: Following is how the event timeline looks for the four executors: spark ui : event timeline for the stage

Upvotes: 1

Views: 265

Answers (2)

ksceriath
ksceriath

Reputation: 195

It appears that the data in the partition is available as soon as the task starts executing (or, at least there is not any significant cost in iterating through that data, as the question would make it seem.)

The bottleneck in above code is actually in func2 (which I did not investigate properly!), and is because of the lazy nature of the iterators in scala. The problem is not related to spark at all.

Firstly, the functions in the mapPartitions calls above appear to get chained and called like so: func3( func2( func1(Iterator[A]) ) ) : Iterator[B]. So the Iterator produced as output of func2 is fed to func3 directly.

Secondly, for above issue func1 (and func2) are defined as :

func1(x: Iterator[A]) => Iterator[B] = x.map(...).filter...

Since these take an iterator and map them to a different iterator, these are not executed right away. But when func3 is executed, partition.toList causes to map closure in func2 to get executed. On profiling, it appears that func3 took all the time, where instead func2 has the code slowing the application.

(Specific to above problem, func2 contains some serialising of case objects to a json string. It appears to execute some time-consuming implicit code, only for the first object on each thread. Since it happens once for each thread, each thread has just one task which takes very long, and explains the event timeline above.)

Upvotes: 0

partha_devArch
partha_devArch

Reputation: 454

  1. First realization is during repartition()
  2. Second is after the filter operation, where all the three mapPartitions starts execution (when the count action is called). Depending on your doSomething() in each of those functions it will depend how the DAG will be created and where it is taking time and accordingly you can optimize.

Upvotes: 1

Related Questions