fanbondi
fanbondi

Reputation: 1047

Scala and Spark UDF function

I made a simple UDF to convert or extract some values from a time field in a temptabl in spark. I register the function but when I call the function using sql it throws a NullPointerException. Below is my function and process of executing it. I am using Zeppelin. Strangly this was working yesterday but it stopped working this morning.

Function

def convert( time:String ) : String = {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  return sdf.format(time1)
}

Register the Function

sqlContext.udf.register("convert",convert _)

Test the function without SQL -- This works

convert(12:12:12) -> returns 12:12

Test the function with SQL in Zeppelin this FAILS.

%sql
select convert(time) from temptable limit 10

Structure of temptable

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- serverip: string (nullable = true)
 |-- request: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- sourceip: string (nullable = true)

Part of the stacktrace that I am getting.

java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)

Upvotes: 11

Views: 36866

Answers (2)

kfkhalili
kfkhalili

Reputation: 1035

You have to define your function as a UDF.

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

val convertUDF: UserDefinedFunction = udf((time:String) => {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  sdf.format(time1)
})

Next you would apply your UDF on your DataFrame.

// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing

Now, as to your actual problem, one reason you are receiving this error could be because your DataFrame contains rows which are nulls. If you filter them out before you apply the UDF, you should be able to continue no problem.

dataFrame.filter(col("time").isNotNull)

I'm curious what else causes a NullPointerException when running a UDF other than it encountering a null, if you found a reason different than my suggestion, I'd be glad to know.

Upvotes: 5

Rockie Yang
Rockie Yang

Reputation: 4925

Use udf instead of define a function directly

import org.apache.spark.sql.functions._

val convert = udf[String, String](time => {
        val sdf = new java.text.SimpleDateFormat("HH:mm")
        val time1 = sdf.parse(time)
        sdf.format(time1)
    }
)

A udf's input parameter is Column(or Columns). And the return type is Column.

case class UserDefinedFunction protected[sql] (
    f: AnyRef,
    dataType: DataType,
    inputTypes: Option[Seq[DataType]]) {

  def apply(exprs: Column*): Column = {
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
  }
}

Upvotes: 16

Related Questions