Ignacio Alorre
Ignacio Alorre

Reputation: 7605

Spark- Why do I get a NPE when write.mode(SaveMode.Overwrite) even if the dataframe allows other actions as first or show?

I have a dataframe with 3 columns which has got a schema similar to this:

org.apache.spark.sql.types.StructType = StructType(StructField(UUID,StringType,true), StructField(NAME,StringType,true), StructField(DOCUMENT,ArrayType(MapType(StringType,StringType,true),true),true))

This could be a sample of a row in this dataframe:

org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01))]

I am generating a new column after applying a udf function over the last column of this dataframe. The one which is and Array>

This is the code I am applying:

def number_length( num:String ) : String = { if(num.length < 6) "000000" else num }

def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => {
  inputSeq.map(x => Map("source" -> x("source"),"document_number" -> number_length(x("document_number")),"first_seen"-> x("first_seen"))))
})

val newDF = DF.withColumn("VALID_DOCUMENT", validating_doc($"DOCUMENT"))

After this everything works fine and I can perform some actions like show and first, which returns:

org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01)),WrappedArray(Map(source -> central, document_number -> 000000, first_seen -> 2018-05-01))]

But if I try to write as an avro this Dataframe, doing like this:

newDF.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("hdfs:///data/mypath")

I get the following error:

WARN scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 6, myserver.azure.com): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:272)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:52)
        at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:51)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
        at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)

But if I drop this new column, it is possible to write the dataframe.

What am I missing when writting the dataframe? is the udf changing something in the schema that I am not aware of?

Upvotes: 1

Views: 516

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35219

Your code gives NPE in the UDF call. Function you use is not null-safe, it will fail if:

  • inputSeq is null.
  • Any element of inputSeq is null.
  • Any document_number number is null in any element of inputSeq is null.

It would also fail if any item was missing (although it is not a problem here. You have to include proper checks, starting with something like this (not tested):

def number_length( num:String ) : String = num match { 
  case null => null
  case _ => if(num.length < 6) "000000" else num 
}


def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => inputSeq match {
  case null => null
  case xs => xs.map {
    case null => null
    case x => Map(
      "source" -> x("source"),
      "document_number" ->  number_length(x("document_number")),
      "first_seen" -> x("first_seen")
    )
  }
})

Why do I get a NPE when write.mode(SaveMode.Overwrite) even if the dataframe allows other actions as first or show?

Because both first and show evaluate only a subset of data and clearly don't hit problematic row.

Upvotes: 2

Related Questions