Reputation: 53
I have a dataframe df
and a list of column names to select from this dataframe as a map
I have tried the following approach to build the map
.
var df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
val cols = List("from_value","to_value")
df.select(
map(
lit(cols(0)),col(cols(0))
,lit(cols(1)),col(cols(1))
)
.as("mapped")
).show(false)
Output:
+------------------------------------+
|mapped |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+
However, I do see few issues with this approach such as
df
Is there an elegant way to handle the above scenarios without being too verbose?
Upvotes: 0
Views: 2753
Reputation: 5078
You can select certain columns in a dataframe as a map using the following function mappingExpr
:
import org.apache.spark.sql.functions.{col, lit, map, when}
import org.apache.spark.sql.{Column, DataFrame}
def mappingExpr(columns: Seq[String], dataframe: DataFrame): Column = {
def getValue(columnName: String): Column = when(col(columnName).isNull, lit("")).otherwise(col(columnName))
map(
columns
.filter(dataframe.columns.contains)
.flatMap(columnName => Seq(lit(columnName), getValue(columnName))): _*
).as("mapped")
}
So given your example's data:
> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("from_value","to_value")
>
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+
The main idea of my function is to transform the list of columns to a list of tuples, where the first element of the tuple contains the column name as column, and the second element of the tuple contains the column value as column. Then I flatten this list of tuples and pass the result to the map spark SQL function
Let's now take your different constraints
As I built the elements inserted in the map by iterating over the list of columns, the number of column's names does not change anything. If we pass an empty list of column's names, there is no error:
> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List()
>
> df.select(mappingExpr(List(), df)).show(false)
+------+
|mapped|
+------+
|[] |
|[] |
|[] |
|[] |
|[] |
|[] |
+------+
This is the most tricky one. Usually when you create a map, the order is not preserved due to how a map is implemented. However in Spark it seems that the order is preserved so it only depends of list of column's names order. So in your example if we change column's names order:
> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("to_value","from_value")
>
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped |
+------------------------------------+
|[to_value -> xyz1, from_value -> 66]|
|[to_value -> abc1, from_value -> 67]|
|[to_value -> fgr1, from_value -> 68]|
|[to_value -> yte1, from_value -> 69]|
|[to_value -> erx1, from_value -> 70]|
|[to_value -> ter1, from_value -> 71]|
+------------------------------------+
I do that in the inner function getValue
, by using the when Spark's SQL function. So when the column value is null
, return empty string otherwise return column value: when(col(columnName).isNull, lit("")).otherwise(col(columnName))
. So when you have null
values in your dataframe, it is replaced by empty string:
> val df = Seq((66, null,"a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("from_value","to_value")
>
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped |
+------------------------------------+
|[from_value -> 66, to_value -> ] |
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+
You can retrieve the list of columns of a dataframe by using the method columns. So I use this method to filter out all column's names that are not in dataframe with the line .filter(dataframe.columns.contain)
. So when the list of column's names contains a column name that is not in dataframe, it is ignored:
> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("a_column_that_does_not_exist", "from_value","to_value")
>
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+
Upvotes: 1