mjbsgll
mjbsgll

Reputation: 742

Spark Scala: mapPartitions in this use case

I was reading a lot about the differences between map and mapPartitions. Still I have some doubts. The thing is after reading I decided to change the map functions for mapPartitions in my code because apparently mapPartitions is faster than map.

My question is about to be sure if my decision is right in scenarios like the following (comments show the previous code):

val reducedRdd = rdd.mapPartitions(partition => partition.map(r => (r.id, r)))
//val reducedRdd = rdd.map(r => (r.id, r))
.reduceByKey((r1, r2) => r1.combineElem(r2))
// .map(e => e._2)
.mapPartitions(partition => partition.map(e => e._2))

I am thinking it right? Thanks!

Upvotes: 3

Views: 2139

Answers (2)

DaRkMaN
DaRkMaN

Reputation: 1054

tl;dr mapPartitions will be fast in this case.

Why

consider a function

def someFunc(row): row {
  // do some processing on row
  // return new row
}

Say we are processing 1million records

map

We will end up calling the someFunc 1 million.
There are bascally 1m virtual function call and other kernel data structures created for the processing

mapPartition

we would write this as

mapPartition { partIter =>
  partIter.map {
    // do some processing on row
    // return new row
  }
}

No virual functions, context switch here.

Hence mapPartitions will be faster.

Also, like mentioned in the @moriarity007's answer, we also need to factor in the object creation overhead involved with the operation, when deciding between the operator to use.

Also, I would recommend using the dataframe transforms and actions to do processing, where the compute can be even faster, since Spark catalyst optimizes your code and also take advantage of code generation.

Upvotes: 1

moriarty007
moriarty007

Reputation: 2224

In your case, mapPartitions should not make any difference.

mapPartitions vs map

mapPartitions is useful when we have some common computation which we want to do for each partition. Example -

rdd.mapPartitions{
  partition => 
    val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>
    partition.map {
      row => (row.id, complicatedRowConverter(row) )
    }
}

In above example, we are creating a complicatedRowConverter function which is dervived from some costly computation. This function will be same for entire RDD partition and we don't need recreate it again and again. The other way to do same thing can be -

rdd.map { row => 
      val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>
      (row.id, complicatedRowConverter(row) )
    }
}

This will be slow because we are unnecessarily running this statement for every row - val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>.

In your case, you don't have any precomputation or anything else for each partition. In the mapPartition, you are simply iterating over each row and mapping it to (row.id, row).

So mapPartition here won't benefit and you can use simple map.

Upvotes: 5

Related Questions