Inna
Inna

Reputation: 169

org.apache.spark.SparkException: Failed to execute user defined function

I new to scala and I am trying to execute the following code:

val SetID = udf{(c:String, d: String) =>
    if( c.UpperCase.contains("EXKLUS") == true)
    {d}
    else {""}
}
val ParquetWithID = STG1
  .withColumn("ID", SetID( col("line_item"), col("line_item_ID")))

both columns (line_item and line_item_id) are defined as Strings in the STG1 Schema.

I get the following error when I try to run the code:

`org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1$$anonfun$2: (string, string) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

and

Caused by: java.lang.NullPointerException
    at MyTests$$anonfun$1$$anonfun$2.apply(MyTests.scala:356)
    at MyTests$$anonfun$1$$anonfun$2.apply(MyTests.scala:355)
    ... 16 more

I also tried c.UpperCase().contains("EXKLUS") but i got the same error. However, if I simply run a "if equals" statement everything works fine. So I guess the problem is in using UpperCase().contains(" ") function in my udf but I do not understand where the problem comes from. Any help would be appriciated!

Upvotes: 5

Views: 21739

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

if the schema contains as

 |-- line_item: string (nullable = true)
 |-- line_item_ID: string (nullable = true)

then checking for null in your if statement should solve the issue as (note that there is toUpperCase method for strings)

val SetID = udf{(c:String, d: String) =>
  if(c != null && c.toUpperCase.contains("EXKLUS") == true)
  {d}
  else {""}
}
val ParquetWithID = STG1
  .withColumn("ID", SetID( col("line_item"), col("line_item_ID")))

I hope the answer is helpful

Upvotes: 6

Related Questions