Reputation: 742
I was reading a lot about the differences between map and mapPartitions. Still I have some doubts.
The thing is after reading I decided to change the map
functions for mapPartitions
in my code because apparently mapPartitions
is faster than map
.
My question is about to be sure if my decision is right in scenarios like the following (comments show the previous code):
val reducedRdd = rdd.mapPartitions(partition => partition.map(r => (r.id, r)))
//val reducedRdd = rdd.map(r => (r.id, r))
.reduceByKey((r1, r2) => r1.combineElem(r2))
// .map(e => e._2)
.mapPartitions(partition => partition.map(e => e._2))
I am thinking it right? Thanks!
Upvotes: 3
Views: 2139
Reputation: 1054
tl;dr mapPartitions
will be fast in this case.
consider a function
def someFunc(row): row {
// do some processing on row
// return new row
}
Say we are processing 1million records
We will end up calling the someFunc
1 million.
There are bascally 1m virtual function call and other kernel data structures created for the processing
we would write this as
mapPartition { partIter =>
partIter.map {
// do some processing on row
// return new row
}
}
No virual functions, context switch here.
Hence mapPartitions
will be faster.
Also, like mentioned in the @moriarity007's answer, we also need to factor in the object creation overhead involved with the operation, when deciding between the operator to use.
Also, I would recommend using the dataframe transforms and actions to do processing, where the compute can be even faster, since Spark catalyst optimizes your code and also take advantage of code generation.
Upvotes: 1
Reputation: 2224
In your case, mapPartitions should not make any difference.
mapPartitions vs map
mapPartitions is useful when we have some common computation which we want to do for each partition. Example -
rdd.mapPartitions{
partition =>
val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>
partition.map {
row => (row.id, complicatedRowConverter(row) )
}
}
In above example, we are creating a complicatedRowConverter
function which is dervived from some costly computation. This function will be same for entire
RDD partition and we don't need recreate it again and again. The other way to do same thing can be -
rdd.map { row =>
val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>
(row.id, complicatedRowConverter(row) )
}
}
This will be slow because we are unnecessarily running this statement for every row - val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>
.
In your case, you don't have any precomputation or anything else for each partition. In the mapPartition, you are simply iterating over each row and mapping it to (row.id, row)
.
So mapPartition here won't benefit and you can use simple map.
Upvotes: 5