Reputation: 6099
I am trying to use an udf defined with Seq[Double] => Seq[Double]
.
When I am trying to use it with a "raw" array<int>
defined at the creation of the Dataframe, Spark does not cast it in array<double>
before using my udf.
However, when I generate an array<int>
from another udf, Spark casts the column in array<double>
before calling my udf.
What is the philosophy behind these casts? What Analyzer's rule is responsible for this cast?
Here some code to illustrate/reproduce:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(Seq(1)))),
StructType(
StructField("array_int", ArrayType(IntegerType, true), false) ::
Nil
)
)
df.show
/**
+---------+
|array_int|
+---------+
| [1]|
+---------+
*/
val f = udf ((v: Seq[Double]) => v)
val generateIntArrays = udf(() => Array.fill(2)(1))
val df1 = df.withColumn("f", f(col("array_int"))) // df1.show fails at runtime, Spark does not cast array_int before calling f
val df2 = df.withColumn("b", generateIntArrays()).withColumn("f", f(col("b"))) // df2.show works at rnutime, Spark explicitly casts the output of col("b") before calling f
df1.explain // not cast
/**
== Physical Plan ==
*(1) Project [array_int#778, UDF(array_int#778) AS f#781]
+- *(1) Scan ExistingRDD[array_int#778]
*/
df2.explain // cast in array<double> before calling `f`
/**
== Physical Plan ==
*(1) Project [array_int#778, UDF() AS b#804, UDF(cast(UDF() as array<double>)) AS f#807]
+- *(1) Scan ExistingRDD[array_int#778]
*/
Upvotes: 2
Views: 138
Reputation: 42352
It seems that if you set the array element to non-nullable, then it will cast to double.
val f = udf ((v: Seq[Double]) => v)
// your code: nullable array element
spark.createDataFrame(
sc.parallelize(Seq(Row(Seq(1)))),
StructType(List(StructField("array_int", ArrayType(IntegerType, true), false)))
).withColumn("f", f(col("array_int"))).explain
== Physical Plan ==
*(1) Project [array_int#313, UDF(array_int#313) AS f#315]
+- *(1) Scan ExistingRDD[array_int#313]
// non-nullable array element
spark.createDataFrame(
sc.parallelize(Seq(Row(Seq(1)))),
StructType(List(StructField("array_int", ArrayType(IntegerType, false), false)))
).withColumn("f", f(col("array_int"))).explain
== Physical Plan ==
*(1) Project [array_int#301, UDF(cast(array_int#301 as array<double>)) AS f#303]
+- *(1) Scan ExistingRDD[array_int#301]
There are also some interesting observations for UDFs that take doubles and called on an integer column. Again the query plan depends on the nullability of the column.
val f = udf ((v: Double) => v)
// nullable integer
spark.createDataFrame(
sc.parallelize(Seq(Row(1))),
StructType(List(StructField("int", IntegerType, true)))
).withColumn("F", f(col("int"))).explain
== Physical Plan ==
*(1) Project [int#359, if (isnull(cast(int#359 as double))) null else UDF(knownnotnull(cast(int#359 as double))) AS F#361]
+- *(1) Scan ExistingRDD[int#359]
// non-nullable integer
spark.createDataFrame(
sc.parallelize(Seq(Row(1))),
StructType(List(StructField("int", IntegerType, false)))
).withColumn("F", f(col("int"))).explain
== Physical Plan ==
*(1) Project [int#365, UDF(cast(int#365 as double)) AS F#367]
+- *(1) Scan ExistingRDD[int#365]
I suppose the reason behind this behaviour is null handling (because the UDF cannot accept null / array of nulls, and they need to be handled before the UDF is called). Perhaps Spark cannot figure out a way to handle nulls inside an array.
Upvotes: 1