BlueSheepToken
BlueSheepToken

Reputation: 6099

Why does Spark cast udf-generated columns before calling another udf but not raw columns?

I am trying to use an udf defined with Seq[Double] => Seq[Double].

When I am trying to use it with a "raw" array<int> defined at the creation of the Dataframe, Spark does not cast it in array<double> before using my udf.

However, when I generate an array<int> from another udf, Spark casts the column in array<double> before calling my udf.

What is the philosophy behind these casts? What Analyzer's rule is responsible for this cast?

Here some code to illustrate/reproduce:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(Seq(Row(Seq(1)))),
  StructType(
    StructField("array_int", ArrayType(IntegerType, true), false) ::
    Nil
  )
)

df.show
/**
+---------+
|array_int|
+---------+
|      [1]|
+---------+
*/

val f = udf ((v: Seq[Double]) => v)
val generateIntArrays = udf(() => Array.fill(2)(1))


val df1 = df.withColumn("f", f(col("array_int"))) // df1.show fails at runtime, Spark does not cast array_int before calling f
val df2 = df.withColumn("b", generateIntArrays()).withColumn("f", f(col("b"))) // df2.show works at rnutime, Spark explicitly casts the output of col("b") before calling f 

df1.explain // not cast
/**
== Physical Plan ==
*(1) Project [array_int#778, UDF(array_int#778) AS f#781]
+- *(1) Scan ExistingRDD[array_int#778]
*/

df2.explain // cast in array<double> before calling `f`
/**
== Physical Plan ==
*(1) Project [array_int#778, UDF() AS b#804, UDF(cast(UDF() as array<double>)) AS f#807]
+- *(1) Scan ExistingRDD[array_int#778]
*/

Upvotes: 2

Views: 138

Answers (1)

mck
mck

Reputation: 42352

It seems that if you set the array element to non-nullable, then it will cast to double.

val f = udf ((v: Seq[Double]) => v)

// your code: nullable array element
spark.createDataFrame(
    sc.parallelize(Seq(Row(Seq(1)))),
    StructType(List(StructField("array_int", ArrayType(IntegerType, true), false)))
).withColumn("f", f(col("array_int"))).explain

== Physical Plan ==
*(1) Project [array_int#313, UDF(array_int#313) AS f#315]
+- *(1) Scan ExistingRDD[array_int#313]

// non-nullable array element
spark.createDataFrame(
    sc.parallelize(Seq(Row(Seq(1)))),
    StructType(List(StructField("array_int", ArrayType(IntegerType, false), false)))
).withColumn("f", f(col("array_int"))).explain

== Physical Plan ==
*(1) Project [array_int#301, UDF(cast(array_int#301 as array<double>)) AS f#303]
+- *(1) Scan ExistingRDD[array_int#301]

There are also some interesting observations for UDFs that take doubles and called on an integer column. Again the query plan depends on the nullability of the column.

val f = udf ((v: Double) => v)

// nullable integer
spark.createDataFrame(
    sc.parallelize(Seq(Row(1))),
    StructType(List(StructField("int", IntegerType, true)))
).withColumn("F", f(col("int"))).explain

== Physical Plan ==
*(1) Project [int#359, if (isnull(cast(int#359 as double))) null else UDF(knownnotnull(cast(int#359 as double))) AS F#361]
+- *(1) Scan ExistingRDD[int#359]

// non-nullable integer
spark.createDataFrame(
    sc.parallelize(Seq(Row(1))),
    StructType(List(StructField("int", IntegerType, false)))
).withColumn("F", f(col("int"))).explain

== Physical Plan ==
*(1) Project [int#365, UDF(cast(int#365 as double)) AS F#367]
+- *(1) Scan ExistingRDD[int#365]

I suppose the reason behind this behaviour is null handling (because the UDF cannot accept null / array of nulls, and they need to be handled before the UDF is called). Perhaps Spark cannot figure out a way to handle nulls inside an array.

Upvotes: 1

Related Questions