Reputation: 27383
I'm struggeling handling null values in a UDF which operates on dataframe (which originates from a hive table) consisting of a struct of floats:
The dataframe (points
) has the following schema:
root
|-- point: struct (nullable = true)
| |-- x: float (nullable = true)
| |-- y: float (nullable = true)
For example, I want to calculate the sum of x and y. Note that I do not "handle" null values in the following examples, but I want to be able to check in my udf whether point
,x
or y
are null
.
First approach:
val sum = udf((x:Float,y:Float) => x+y)
points.withColumn("sum",sum($"point.x",$"point.y"))
This does not work if the struct
point is null
, in this case the udf is never evaluated (the code in the udf is never executed!), the result is null. Also, I cannot check x
or y
for being null as Floats
cannot be null in scala.
Second approach:
val sum = udf((pt:Row) => pt.getFloat(0)+pt.getFloat(1))
points.withColumn("sum",sum($"point"))
Which this approach, I can check pt
for null in my udf, but I'm nut able to check x
and y
because Floats
cannot be null. I get a NullPointerException
in this case.
How can I write an udf win which I can check the struct and x and y for being null?
I'm using spark 1.6.1
Update: In contrast to this question, I'm dealing with floats and not with strings (strings can be null in scala, floats not)
Upvotes: 4
Views: 2910
Reputation: 1
def handle_default_values(df):
for column, dtype in df.dtypes:
if dtype == 'int':
df = df.withColumn(column, when(col(column).isNull(), lit(-1)).otherwise(col(column)))
elif (dtype == 'float') or (dtype == 'double') or (dtype =='decimal(18,2)'):
df = df.withColumn(column, when(col(column).isNull(), lit(0.0)).otherwise(col(column)))
elif dtype == 'string':
df = df.withColumn(column, when(col(column).isNull(), lit('UNK')).otherwise(col(column)))
elif dtype == 'timestamp':
df = df.withColumn(column, when(col(column).isNull(), to_date(lit('1900-01-01'))).otherwise(col(column)))
return df
order_job_nullh = handle_default_values(order_job_trans)
display(order_job_nullh)
Upvotes: 0
Reputation: 13985
You can use Row.isNullAt(i)
to check if i
th field is null. In your case, you should write your udf as,
sum = udf((point: Row) => point.match {
case p if (p.isNullAt(0) && p.isNullAt(0)) => 0f
case p if p.isNullAt(0) => p.getFloat(1)
case p if p.isNullAt(1) => p.getFloat(0)
case p => p.getFloat(0) + p.getFloat(1)
})
Upvotes: 5