Reputation: 969
I'm new to SparkSQL/Scala and I'm struggling with a couple seemingly simple tasks.
I'm trying to build some dynamic SQL from a Scala String Array. I'm trying to re-type some columns in my DataFrame, but I won't know exactly which I need to retype until runtime where I can see the set of columns in the DataFrame. So I'm trying to doing this:
val cols = df.columns
val typedCols = cols.map( c => getTypedColumn(c) )
df.select( ...) or df.selectExpr(...) // how to invoke this with vals from my string array??
typedCols will end up being an array of strings with values like this:
["a", "cast(b as int) b", "c"]
Do I need to create a big comma delimited string first from that array?
So, assuming this will work, I'd invoke that select statement, and it would transform my DataFrame to a new DataFrame with my desired types. But some of those records in the DataFrame will have errors, and will fail the attempted re-typing.
How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket? Would I need to do a validation pass first before attempting the DataFrame select?
Upvotes: 2
Views: 4954
Reputation: 4888
The answer is good but missing another simple solution:
val columns = Array("name", "age")
val df2 = df.select(columns.map(col): _*)
Upvotes: 2
Reputation: 35249
You can just use variadic arguments:
val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c")
val typedCols = Array("a", "cast(b as int) b", "c")
df.selectExpr(typedCols: _*).show
+---+----+---+
| a| b| c|
+---+----+---+
| a| 1| c|
|foo|null|baz|
+---+----+---+
but personally I prefer columns:
val typedCols = Array($"a", $"b" cast "int", $"c")
df.select(typedCols: _*).show
How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket?
Data that failed to cast
is NULL
. To find good records use na.drop
:
val result = df.selectExpr(typedCols: _*)
val good = result.na.drop()
To find bad check if any is NULL
import org.apache.spark.sql.functions.col
val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))
To get unmatched data:
If typedCols
are Seq[Column]
you can
df.where(typedCols.map(_.isNull).reduce(_ || _))
If typedCols
are Seq[String]
you can:
import org.apache.spark.sql.functions.expr
df.where(typedCols.map(expr(_).isNull).reduce(_ || _))
Upvotes: 4