Reputation: 52253
(Scala-specific question.)
While Spark docs encourage the use of DataFrame API where possible, if DataFrame API is insufficient, the choice is usually between falling back to RDD API or using UDFs. Is there inherent performance difference between these two alternatives?
RDD and UDF are similar in that neither of them can benefit from Catalyst and Tungsten optimizations. Is there any other overhead, and if there is, does it differ between the two approaches?
To give a specific example, let's say I have a DataFrame that contains a column of text data with custom formatting (not amenable to regexp matching). I need to parse that column and add a new vector column that contains the resulting tokens.
Upvotes: 11
Views: 5073
Reputation: 15539
(Note: I don't have measured backing for this)
To me, shuffle and (de)serialization are the main costs. But after these, having clean code is most important. With that in mind:
The main drawback of using RDD operations is that a (de)serialization of/into full jvm objects is required. While using udf might only (de)serialize the required columns. Note that this is when processing column oriented data such as parquet, for other data format I don't know, but would expect that in many cases both have similar perf.
So, if your algorithm are mostly filtering and shuffling op, and/or can be simply expressed with dataframe op and local udf, you should use those. However, if your algorithms requires complex processing over many columns, it is probably better to pay deserialization up-front, and execute clean and efficient scala code on jvm objects.
So, in my personal experience where I implement complex mathematical algorithms, I typically split the code in two steps:
Upvotes: 2
Reputation: 330113
neither of them can benefit from Catalyst and Tungsten optimizations
This is not exactly true. While UDFs don't benefit from Tungsten optimization (arguably simple SQL transformation don't get huge boost there either) you still may benefit from execution plan optimizations provided by Catalyst. Let's illustrate that with a simple example (Note: Spark 2.0 and Scala. Don't extrapolate this to earlier versions, especially with PySpark):
val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)
val df = Seq(("a", 1), ("b", 2)).toDF
df
.groupBy($"_1")
.agg(sum($"_2").as("_2"))
.where(f($"_1"))
.withColumn("_2", g($"_2"))
.select($"_1")
.explain
// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
// +- *HashAggregate(keys=[_1#2], functions=[])
// +- *Project [_1#2]
// +- *Filter UDF(_1#2)
// +- LocalTableScan [_1#2, _2#3]
Execution plan shows us a couple of things:
Depending on the data and pipeline this can provide a substantial performance boost almost for free.
That being said both RDDs and UDFs require migrations between safe and unsafe with the latter one being significantly less flexible. Still, if the only thing you need is a simple map
-like behavior without initializing expensive objects (like database connections) then UDF is the way to go.
In slightly more complex scenarios you can easily drop down to generic Dataset
and reserve RDDs
for cases when you really require an access to some low level features like custom partitioning.
Upvotes: 18