Does flatmap give better performance than filter+map?

I have a quite a big dataset (100 million+ records with 100's of columns) that I am processing with spark. I am reading the data into a spark dataset and I want to filter this dataset and map a subset of its fields to a case class.

the code looks somewhat similar,

case class Subset(name:String,age:Int)
case class Complete(name:String,field1:String,field2....,age:Int)

val ds = spark.read.format("csv").load("data.csv").as[Complete]

#approach 1
ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))

#approach 2
ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)

Which approach is better? Any additional hints on how I can make this code more performant?

Thanks!

Edit

I ran some tests to compare the runtimes and it looks like approach 2 is quite faster, the code i used for getting the runtimes is as follows,

val subset = spark.time {
   ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))
}

spark.time {
   subset.count()
}

and 

val subset2 = spark.time {
   ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)
}

spark.time {
   subset2.count()
}

Upvotes: 6

Views: 3204

Answers (2)

Mike Allen
Mike Allen

Reputation: 8299

Update: My original answer contained an error: Spark does support Seq as the result of a flatMap (and converts the result back into an Dataset). Apologies for the confusion. I also added more information on improving the performance of your analysis.

Update 2: I missed that you're using a Dataset rather than an RDD (doh!). This doesn't affect the answer significantly.

Spark is a distributed system that partitions data across multiple nodes and processes data in parallel. In terms of efficiency, actions that result in re-partitioning (requiring data to be transferred between nodes) is far more expensive in terms of run-time than in-place modifications. Also, you should note that operations that merely transform data, such as filter, map, flatMap, etc. are merely stored and do not execute until an action operation is performed (such as reduce, fold, aggregate, etc.). Consequently, neither alternative actually does anything as things stand.

When an action is performed on the result of these transformations, I would expect the filter operation to be far more efficient: it only processes data (using the subsequent map operation) that passes the predicate x=>x.age>25 (more typically written as _.age > 25). While it may appear that filter creates an intermediary collection, it executes lazilly. As a result, Spark appears to fuse the filter and map operations together.

Your flatMap operation is, frankly, hideous. It forces processing, sequence creation and subsequent flattening of every data item, which will definitely increase overall processing.

That said, the best way to improve the performance of your analysis is to control the partitioning so that the data is split roughly evenly over as many nodes as possible. Refer to this guide as a good starting point.

Upvotes: 5

klswt
klswt

Reputation: 39

Judging by logic of the syntax the first approach should use less space, since flatMap extends to .map().flatten, both on an argument of equal size. It compiles to the same Java bytecode in the Scala REPL (edit: when using a pet example, which obviously doesn't compensate for actually testing it with comparably large data).

Upvotes: -1

Related Questions