Reputation: 969
This question is related to this thread: Spark 2.2 Scala DataFrame select from string array, catching errors
I need to distinguish between records missing columns (which is not an error in my use case) and records having garbage values that don't work for the column type.
After I do my selectExpr, both of these situations appear as null in the resulting DataFrame. I'm looking for a fast way to include the records missing columns in with the good results, while throwing the records having garbage values in a bad bucket. The bad would include things like an int field having the empty string as a value, or "abc".
For example, say I have DataFrame like this: Col A - string, Col B - int, Col C - string,
A B C
"x", "", "" - Error, bad value for B
"", null,"" - Good, missing value for B
"", "a", "" - Bad, bad value for B
"x", "1", "x" - Good, normal case
----- edit -----
Showing code creating the dataframes. Data comes in as json, all fields are quoted so it initially thinks everything is a string. I need to type several of the fields to int, boolean, etc. See link at top for full details.
val cols = dfLower.columns
val typedCols = cols.map( c => getTypeStmtForCol(c, qradarType) )
val result = dfLower.selectExpr(typedCols: _*)
// This puts both records with missing columns and bad values in bad.
// Need way to distinguish between those 2 cases.
val bad = dfLower.where(typedCols.map(expr(_).isNull).reduce(_ || _))
val good = result.na.drop()
---- edit 2 ----
I think I may have an idea. If I could count the number of nulls in each record before and after, then only those records with more nulls after the select should go in error. Unsure how to implement that...
Upvotes: 0
Views: 2993
Reputation: 587
Sort of quick and dirty but create a udf that tests your conditions and returns a status based on the outcome of the conditions.
def checkIntData=udf((columnData: String) => {
var status = "GOOD"
try{
columnData.toInt
} catch {
case ex: Exception => {
if(columnData == null) {
// Do nothing. This is fine
} else if(columnData.length == 0) {
status = "ERROR"
} else {
status = "BAD"
}
}
}
status
})
val seqData = Seq(("x","","","0"),("",null,"","3"),("","a","","z"),("x","1","x",""))
val df = seqData.toDF("col1","col2","col3","col4")
val colsToCheck = df.select("col2","col4").columns
var newdf = df
// Iterate over the columns you want to check inside the dataframe. Each checked column will add a new status column to newdf
colsToCheck.map{column =>
newdf = newdf.withColumn(column+"Status", checkIntData(newdf(column)))
}
newdf.show()
This will return the following:
+----+----+----+----+----------+----------+
|col1|col2|col3|col4|col2Status|col4Status|
+----+----+----+----+----------+----------+
| x| | | 0| ERROR| GOOD|
| |null| | 3| GOOD| GOOD|
| | a| | | BAD| ERROR|
| x| 1| x| z| GOOD| BAD|
+----+----+----+----+----------+----------+
You can then create your error bucket by filtering based on the status column.
Columns 1 through 3 are from your example. I added column 4 to show how to apply this to multiple columns without having to write .withColumn()
a hundred times. I achieved this by creating an array of columns colsToCheck
and then iterating over to apply the udf to all the selected columns.
NOTE! Since I'll probably be yelled at for doing so I want to let you know that using try/catch as flow control is considered an anti-pattern (aka bad programming). Read more to find out why.
Upvotes: 1