medloh
medloh

Reputation: 969

Spark 2.2 Scala DataFrame select from string array, catching errors

I'm new to SparkSQL/Scala and I'm struggling with a couple seemingly simple tasks.

I'm trying to build some dynamic SQL from a Scala String Array. I'm trying to re-type some columns in my DataFrame, but I won't know exactly which I need to retype until runtime where I can see the set of columns in the DataFrame. So I'm trying to doing this:

val cols = df.columns
val typedCols = cols.map( c => getTypedColumn(c) )
df.select( ...)  or df.selectExpr(...) // how to invoke this with vals from my string array??

typedCols will end up being an array of strings with values like this:

["a", "cast(b as int) b", "c"]

Do I need to create a big comma delimited string first from that array?

So, assuming this will work, I'd invoke that select statement, and it would transform my DataFrame to a new DataFrame with my desired types. But some of those records in the DataFrame will have errors, and will fail the attempted re-typing.

How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket? Would I need to do a validation pass first before attempting the DataFrame select?

Upvotes: 2

Views: 4954

Answers (2)

Abdennacer Lachiheb
Abdennacer Lachiheb

Reputation: 4888

The answer is good but missing another simple solution:

val columns = Array("name", "age")
val df2 = df.select(columns.map(col): _*)

Upvotes: 2

Alper t. Turker
Alper t. Turker

Reputation: 35249

You can just use variadic arguments:

val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c")
val typedCols = Array("a", "cast(b as int) b", "c")
df.selectExpr(typedCols: _*).show

+---+----+---+
|  a|   b|  c|
+---+----+---+
|  a|   1|  c|
|foo|null|baz|
+---+----+---+

but personally I prefer columns:

val typedCols = Array($"a", $"b" cast "int", $"c")
df.select(typedCols: _*).show

How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket?

Data that failed to cast is NULL. To find good records use na.drop:

val result = df.selectExpr(typedCols: _*)
val good = result.na.drop()

To find bad check if any is NULL

import org.apache.spark.sql.functions.col

val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))

To get unmatched data:

  • If typedCols are Seq[Column] you can

    df.where(typedCols.map(_.isNull).reduce(_ || _))  
    
  • If typedCols are Seq[String] you can:

    import org.apache.spark.sql.functions.expr
    
    df.where(typedCols.map(expr(_).isNull).reduce(_ || _))  
    

Upvotes: 4

Related Questions