Reputation: 99
I am trying to log the execution time of each mapPartition operation on a RDD using a code like this (in Scala):
rdd.mapPartitions{partition =>
val startTime = Calendar.getInstance().getTimeInMillis
result = partition.map{element =>
[...]
}
val endTime = Calendar.getInstance().getTimeInMillis
logger.info("Partition time "+(startTime-endTime)+ "ms")
result
}
The problem is that it logs the "partition time" immediately, before it start to execute the map operation, so I always obtain a time like 2 ms.
I noticed it by watching the Spark Web UI, in the log file the row regarding the execution time appears immediately after the task started, not at the end as expected.
Someone is able to explain me why? Inside the mapPartitions the code should be executed linearly, or I am wrong?
Thanks
Regards Luca
Upvotes: 1
Views: 1651
Reputation: 27373
partitions
inside of mapPartitions
is an Iterator[Row]
, and an Iterator
is evaluated lazily in Scala (i.e. when the Iterator is consumed). This has nothing to to with Spark's lazy evauation!
Calling partitions.size
will trigger the evaluation of your mapping, but will consume the Iterator (because it's only iterable once). An example
val it = Iterator(1,2,3)
it.size // 3
it.isEmpty // true
What you can do is to convert the Iterator to an non-lazy collection type:
rdd.mapPartitions{partition =>
val startTime = Calendar.getInstance().getTimeInMillis
result = partition.map{element =>
[...]
}.toVector // now the statements are evaluated
val endTime = Calendar.getInstance().getTimeInMillis
logger.info("Partition time "+(startTime-endTime)+ "ms")
result.toIterator
}
EDIT: Note that you can use System.currentTimeMillis()
(or even System.nanoTime()
) instead of using Calendar
.
Upvotes: 4