Reputation: 7605
I have a dataframe with 3 columns which has got a schema similar to this:
org.apache.spark.sql.types.StructType = StructType(StructField(UUID,StringType,true), StructField(NAME,StringType,true), StructField(DOCUMENT,ArrayType(MapType(StringType,StringType,true),true),true))
This could be a sample of a row in this dataframe:
org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01))]
I am generating a new column after applying a udf function over the last column of this dataframe. The one which is and Array>
This is the code I am applying:
def number_length( num:String ) : String = { if(num.length < 6) "000000" else num }
def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => {
inputSeq.map(x => Map("source" -> x("source"),"document_number" -> number_length(x("document_number")),"first_seen"-> x("first_seen"))))
})
val newDF = DF.withColumn("VALID_DOCUMENT", validating_doc($"DOCUMENT"))
After this everything works fine and I can perform some actions like show and first
, which returns:
org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01)),WrappedArray(Map(source -> central, document_number -> 000000, first_seen -> 2018-05-01))]
But if I try to write as an avro this Dataframe, doing like this:
newDF.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("hdfs:///data/mypath")
I get the following error:
WARN scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 6, myserver.azure.com): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:272)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:52)
at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:51)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)
But if I drop this new column, it is possible to write the dataframe.
What am I missing when writting the dataframe? is the udf changing something in the schema that I am not aware of?
Upvotes: 1
Views: 516
Reputation: 35219
Your code gives NPE in the UDF
call. Function you use is not null
-safe, it will fail if:
inputSeq
is null
.inputSeq
is null
.document_number
number is null
in any element of inputSeq
is null
.It would also fail if any item was missing (although it is not a problem here. You have to include proper checks, starting with something like this (not tested):
def number_length( num:String ) : String = num match {
case null => null
case _ => if(num.length < 6) "000000" else num
}
def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => inputSeq match {
case null => null
case xs => xs.map {
case null => null
case x => Map(
"source" -> x("source"),
"document_number" -> number_length(x("document_number")),
"first_seen" -> x("first_seen")
)
}
})
Why do I get a NPE when write.mode(SaveMode.Overwrite) even if the dataframe allows other actions as first or show?
Because both first
and show
evaluate only a subset of data and clearly don't hit problematic row.
Upvotes: 2