Vijay
Vijay

Reputation: 3650

Spark: Single pipelined scala command better than separate commands?

I am using Spark with scala. I wanted to know if having single one line command better than separate commands? What are the benefits if any? Does it gain more efficiency in terms of speed? Why?

for e.g.

var d = data.filter(_(1)==user).map(f => (f(2),f(5).toInt)).groupByKey().map(f=> (f._1,f._2.count(x=>true), f._2.sum))

against

var a = data.filter(_(1)==user)
var b = a.map(f => (f(2),f(5).toInt))
var c = b.groupByKey()
var d = c.map(f=> (f._1,f._2.count(x=>true), f._2.sum))

Upvotes: 1

Views: 1541

Answers (2)

Vidya
Vidya

Reputation: 30310

There is no difference in terms of execution, but you might want to consider the readability of your code. I would go with your first example but like this:

var d = data.filter(_(1)==user)
.map(f => (f(2),f(5).toInt))
.groupByKey()
.map(f=> (f._1,f._2.count(x=>true), f._2.sum))

Really this is more of a Scala question than Spark though. Still, as you can see from Spark's implementation of word count as shown in their documentation

val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

you don't need to worry about those kinds of things. The Scala language (through laziness, etc.) and Spark's RDD implementation handles all that at a higher level of abstraction.

If you find really bad performance, then you should take the time to explore why. As Knuth said, "premature optimization is the root of all evil."

Upvotes: 0

Josh Rosen
Josh Rosen

Reputation: 13801

There is no performance difference between your two examples; the decision to chain RDD transformations or to explicitly represent the intermediate RDDs is just a matter of style. Spark's lazy evaluation means that no actual distributed computation will be performed until you invoke an RDD action like take() or count().

During execution, Spark will pipeline as many transformations as possible. For your example, Spark won't materialize the entire filtered dataset before it maps it: the filter() and map() transformations will be pipelined together and executed in a single stage. The groupByKey() transformation (usually) needs to shuffle data over the network, so it's executed in a separate stage. Spark would materialize the output of filter() only if it had been cache()d.

You might need to use the second style if you want to cache an intermediate RDD and perform further processing on it. For example, if I wanted to perform multiple actions on the output of the groupByKey() transformation, I would write something like

val grouped = data.filter(_(1)==user)
                  .map(f => (f(2),f(5).toInt))
                  .groupByKey()
                  .cache()
val mapped = grouped.map(f=> (f._1,f._2.count(x=>true), f._2.sum))
val counted = grouped.count()

Upvotes: 6

Related Questions