Ashika Umanga Umagiliya
Ashika Umanga Umagiliya

Reputation: 9158

Spark : Access Row inside an UDF

I have following UDF used to convert time stored as a string into a timestamp.

  val hmsToTimeStampUdf = udf((dt: String) => {
    if (dt == null) null else {
      val formatter = DateTimeFormat.forPattern("HH:mm:ss")
      try {
        new Timestamp(formatter.parseDateTime(dt).getMillis)
      } catch {
        case t: Throwable => throw  new RuntimeException("hmsToTimeStampUdf,dt="+dt, t)
      }
    }
  })

This UDF is used to convert String value into Timestamp:

outputDf.withColumn(schemaColumn.name, ymdToTimeStampUdf(col(schemaColumn.name))

But some CSV files have invalid value for this column causing RuntimeException. I want to find which rows have these broken records. Is it possible to access row information inside the UDF?

Upvotes: 1

Views: 170

Answers (2)

werner
werner

Reputation: 14845

You can add the row as second input parameter to the udf:

val hmsToTimeStampUdf = udf((dt: String, r: Row) => {
  if (dt == null) null else {
    val formatter = DateTimeFormat.forPattern("HH:mm:ss")
    try {
      new Timestamp(formatter.parseDateTime(dt).getMillis)
    } catch {
      case t: Throwable => {
        println(r) //do some error handling
        null
      }
    }
  }
})

When calling the udf, use a struct with all columns of the dataframe as second parameter (thanks to this answer):

df.withColumn("dt", hmsToTimeStampUdf(col("dt"), struct(df.columns.map(df(_)) : _*)))

Upvotes: 1

mazaneicha
mazaneicha

Reputation: 9427

Instead of throwing a RuntimeException that kills your .csv parsing, maybe a better approach would be to have UDF returning a tuple (well-formed, corrupted) value. Then, you can easily segregate good/bad rows by selecting is null/is not null subsets.

def safeConvert(dt: String) : (Timestamp,String) = {
  if (dt == null) 
    (null,null) 
  else {
    val formatter = DateTimeFormat.forPattern("HH:mm:ss")
    try {
      (new Timestamp(formatter.parseDateTime(dt).getMillis),null)
    } catch {
      case e:Exception => 
        (null,dt)
    }
  }
}
val safeConvertUDF = udf(safeConvert(_:String))

val df = Seq(("00:01:02"),("03:04:05"),("67:89:10")).toDF("dt")

df.withColumn("temp",safeConvertUDF($"dt"))
  .withColumn("goodData",$"temp".getItem("_1"))
  .withColumn("badData",$"temp".getItem("_2"))
  .drop($"temp").show(false)
+--------+-------------------+--------+
|dt      |goodData           |badData |
+--------+-------------------+--------+
|00:01:02|1970-01-01 00:01:02|null    |
|03:04:05|1970-01-01 03:04:05|null    |
|67:89:10|null               |67:89:10|
+--------+-------------------+--------+

Upvotes: 2

Related Questions