Reputation: 9158
I have following UDF used to convert time stored as a string into a timestamp.
val hmsToTimeStampUdf = udf((dt: String) => {
if (dt == null) null else {
val formatter = DateTimeFormat.forPattern("HH:mm:ss")
try {
new Timestamp(formatter.parseDateTime(dt).getMillis)
} catch {
case t: Throwable => throw new RuntimeException("hmsToTimeStampUdf,dt="+dt, t)
}
}
})
This UDF is used to convert String
value into Timestamp
:
outputDf.withColumn(schemaColumn.name, ymdToTimeStampUdf(col(schemaColumn.name))
But some CSV files have invalid value for this column causing RuntimeException
. I want to find which rows have these broken records. Is it possible to access row information inside the UDF?
Upvotes: 1
Views: 170
Reputation: 14845
You can add the row as second input parameter to the udf:
val hmsToTimeStampUdf = udf((dt: String, r: Row) => {
if (dt == null) null else {
val formatter = DateTimeFormat.forPattern("HH:mm:ss")
try {
new Timestamp(formatter.parseDateTime(dt).getMillis)
} catch {
case t: Throwable => {
println(r) //do some error handling
null
}
}
}
})
When calling the udf, use a struct with all columns of the dataframe as second parameter (thanks to this answer):
df.withColumn("dt", hmsToTimeStampUdf(col("dt"), struct(df.columns.map(df(_)) : _*)))
Upvotes: 1
Reputation: 9427
Instead of throwing a RuntimeException
that kills your .csv parsing, maybe a better approach would be to have UDF returning a tuple (well-formed, corrupted) value. Then, you can easily segregate good/bad rows by selecting is null
/is not null
subsets.
def safeConvert(dt: String) : (Timestamp,String) = {
if (dt == null)
(null,null)
else {
val formatter = DateTimeFormat.forPattern("HH:mm:ss")
try {
(new Timestamp(formatter.parseDateTime(dt).getMillis),null)
} catch {
case e:Exception =>
(null,dt)
}
}
}
val safeConvertUDF = udf(safeConvert(_:String))
val df = Seq(("00:01:02"),("03:04:05"),("67:89:10")).toDF("dt")
df.withColumn("temp",safeConvertUDF($"dt"))
.withColumn("goodData",$"temp".getItem("_1"))
.withColumn("badData",$"temp".getItem("_2"))
.drop($"temp").show(false)
+--------+-------------------+--------+
|dt |goodData |badData |
+--------+-------------------+--------+
|00:01:02|1970-01-01 00:01:02|null |
|03:04:05|1970-01-01 03:04:05|null |
|67:89:10|null |67:89:10|
+--------+-------------------+--------+
Upvotes: 2