Gaglia88
Gaglia88

Reputation: 99

Apache Spark mapPartition strange behavior (lazy evaluation?)

I am trying to log the execution time of each mapPartition operation on a RDD using a code like this (in Scala):

rdd.mapPartitions{partition =>
   val startTime = Calendar.getInstance().getTimeInMillis
   result = partition.map{element =>
      [...]
   }
   val endTime = Calendar.getInstance().getTimeInMillis
   logger.info("Partition time "+(startTime-endTime)+ "ms")
   result
}

The problem is that it logs the "partition time" immediately, before it start to execute the map operation, so I always obtain a time like 2 ms.

I noticed it by watching the Spark Web UI, in the log file the row regarding the execution time appears immediately after the task started, not at the end as expected.

Someone is able to explain me why? Inside the mapPartitions the code should be executed linearly, or I am wrong?

Thanks

Regards Luca

Upvotes: 1

Views: 1651

Answers (1)

Raphael Roth
Raphael Roth

Reputation: 27373

partitions inside of mapPartitions is an Iterator[Row], and an Iterator is evaluated lazily in Scala (i.e. when the Iterator is consumed). This has nothing to to with Spark's lazy evauation!

Calling partitions.size will trigger the evaluation of your mapping, but will consume the Iterator (because it's only iterable once). An example

val it = Iterator(1,2,3)
it.size // 3
it.isEmpty // true

What you can do is to convert the Iterator to an non-lazy collection type:

rdd.mapPartitions{partition =>
   val startTime = Calendar.getInstance().getTimeInMillis
   result = partition.map{element =>
      [...]
   }.toVector // now the statements are evaluated
   val endTime = Calendar.getInstance().getTimeInMillis
   logger.info("Partition time "+(startTime-endTime)+ "ms")
   result.toIterator
}

EDIT: Note that you can use System.currentTimeMillis() (or even System.nanoTime()) instead of using Calendar.

Upvotes: 4

Related Questions