Reputation: 3168
I have a dataframe with the following code:
def test(lat: Double, lon: Double) = {
println(s"testing ${lat / lon}")
Map("one" -> "one", "two" -> "two")
}
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
Now checking the logs, I found out that for each row the UDF is executed 3 times. If I add the "test3" from a "test.three" column then the UDF is executed once more.
Can someone explain me why?
Can this be avoid properly (without caching the dataframe after "test" is added, even if this works)?
Upvotes: 11
Views: 5829
Reputation: 3344
If you want to avoid multiple calls to a udf (which is useful especially if the udf is a bottleneck in your job) you can do it as follows:
val testUDF = udf(test _).asNondeterministic()
Basically you tell Spark that your function is not deterministic and now Spark makes sure it is called only once because it is not safe to call it multiple times (each call could possibly return different result).
Also be aware that this trick is not for free, by doing this you are putting some constraints on the optimizer, one side effect of this is for example that Spark optimizer does not push filters through expressions that are not deterministic so you become responsible for optimal position of the filters in your query.
Upvotes: 12