javamonkey79
javamonkey79

Reputation: 17755

spark udf not being called

Given the following example:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

val testUdf: UserDefinedFunction = udf((a: String, b: String, c: Int) => { 
  val out = s"test1: $a $b $c"
  println(out)
  out
})

val testUdf2: UserDefinedFunction = udf((a: String, b: String, c: String) => { 
  val out = s"test2: $a $b $c"
  println(out)
  out
})

Seq(("hello", "world", null))
.toDF("a", "b", "c")
.withColumn("c", $"c" cast "Int")
.withColumn("test1", testUdf($"a", $"b", $"c"))
.withColumn("test2", testUdf2($"a", $"b", $"c"))
.show

testUdf does not appear to be called. There is no error, no warning, it just returns null.

Is there a way to detect these silent failures? Also, what is going on here?

Spark 2.4.4 Scala 2.11

Upvotes: 1

Views: 982

Answers (3)

yangzhongbao
yangzhongbao

Reputation: 11

I don't know what caused this. But I think it is most likely because of the implicit conversion

code1

    val spark = SparkSession.builder()
      .master("local")
      .appName("test")
      .getOrCreate()
    import spark.implicits._
    val testUdf: UserDefinedFunction = udf((a: String, b: String, c: Int) => {
      val out = s"test1: $a $b $c"
      println(out)
      out
    })
    
    Seq(("hello", "world", null))
      .toDF("a", "b", "c")
      .withColumn("test1", testUdf($"a", $"b", $"c"))
      .show

code2

    val spark = SparkSession.builder()
      .master("local")
      .appName("test")
      .getOrCreate()
    import spark.implicits._
    val testUdf: UserDefinedFunction = udf((a: String, b: String, c: String) => {
      val out = s"test1: $a $b $c"
      println(out)
      out
    })

    Seq(("hello", "world", null))
      .toDF("a", "b", "c")
      .withColumn("test1", testUdf($"a", $"b", $"c"))
      .show

code1 logical plan

code2 logical plan

code2 logical plan

Upvotes: 2

meniluca
meniluca

Reputation: 306

You should have a scala.MatchError: scala.Null error when you try to cast to null, besides your definition of UDF doesn't work for me as I got a java.lang.UnsupportedOperationException: Schema for type AnyRef is not supported when I try to register it.

What about this:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

def testUdf(a: String, b: String, c: Integer): String = { 
  val out = s"test1: $a $b $c"
  println(out)
  out
}

def testUdf2(a: String, b: String, c: String): String = { 
  val out = s"test2: $a $b $c"
  println(out)
  out
}

val yourTestUDF = udf(testUdf _)
val yourTestUDF2 = udf(testUdf2 _)

// spark.udf.register("yourTestUDF", yourTestUDF) // just in case you need it in SQL

spark.createDataFrame(Seq(("hello", "world", null.asInstanceOf[Integer])))
.toDF("a", "b", "c")
.withColumn("test1", yourTestUDF($"a", $"b", $"c"))
.withColumn("test2", yourTestUDF2($"a", $"b", $"c"))
.show(false)

Output:

test1: hello world null
test2: hello world null
+-----+-----+----+-----------------------+-----------------------+
|a    |b    |c   |test1                  |test2                  |
+-----+-----+----+-----------------------+-----------------------+
|hello|world|null|test1: hello world null|test2: hello world null|
+-----+-----+----+-----------------------+-----------------------+

Upvotes: 0

pasha701
pasha701

Reputation: 7207

Scala type "Int" does not allow nulls. Variable "c" type can be changed to "Integer".

Upvotes: 5

Related Questions