Reputation: 18043
EX1. This with an RDD gives Serialization as we expect with or without Object and val num being the culprit, fine:
object Example {
val r = 1 to 1000000 toList
val rdd = sc.parallelize(r,3)
val num = 1
val rdd2 = rdd.map(_ + num)
rdd2.collect
}
Example
EX2. Using a Dataframe in similar fashion, however, does not. Why is that as it looks sort of the same? What am I missing here?
object Example {
import spark.implicits._
import org.apache.spark.sql.functions._
val n = 1
val df = sc.parallelize(Seq(
("r1", 1, 1),
("r2", 6, 4),
("r3", 4, 1),
("r4", 1, 2)
)).toDF("ID", "a", "b")
df.repartition(3).withColumn("plus1", $"b" + n).show(false)
}
Example
Reasons not entirely clear to me on DF, would expect similar behaviour. Looks like DSs circumvent some issues, but I may well be missing something.
Running on Databricks gives plenty of Serializatiion issues, so do not think that is affecting things, handy to test.
Upvotes: 0
Views: 753
Reputation: 36
The reason is simple and more fundamental than distinction between RDD
and Dataset
:
The first piece of code evaluates a function
_ + num
therefore it has to be computed and evaluated.
The second piece of code doesn't. Following
$"b" + n
is just a value, therefore no closure computation and subsequent serialization is required.
If this is still not clear you can think about it this way:
If your Dataset
code was closer to it's RDD
counterpart, for example:
object Example {
import spark.implicits._
val num = 1
spark.range(1000).map(_ + num).collect
}
or
Example {
import spark.implicits._
import org.apache.spark.sql.functions._
val num = 1
val f = udf((x: Int) => x + num)
spark.range(1000).select(f($"id")).collect
}
it would fail with serialization exception, same as RDD
version does.
Upvotes: 2