Mysticmajic
Mysticmajic

Reputation: 53

Spark scala select certain columns in a dataframe as a map

I have a dataframe df and a list of column names to select from this dataframe as a map

I have tried the following approach to build the map.

var df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")

val cols = List("from_value","to_value")

df.select(
  map(
    lit(cols(0)),col(cols(0))
    ,lit(cols(1)),col(cols(1))
  )
  .as("mapped")
  ).show(false)

Output:

+------------------------------------+
|mapped                              |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+

However, I do see few issues with this approach such as

Is there an elegant way to handle the above scenarios without being too verbose?

Upvotes: 0

Views: 2753

Answers (1)

Vincent Doba
Vincent Doba

Reputation: 5078

You can select certain columns in a dataframe as a map using the following function mappingExpr:

import org.apache.spark.sql.functions.{col, lit, map, when}
import org.apache.spark.sql.{Column, DataFrame}

def mappingExpr(columns: Seq[String], dataframe: DataFrame): Column = {
  def getValue(columnName: String): Column = when(col(columnName).isNull, lit("")).otherwise(col(columnName))

  map(
    columns
      .filter(dataframe.columns.contains)
      .flatMap(columnName => Seq(lit(columnName), getValue(columnName))): _*
  ).as("mapped")
}  

So given your example's data:

> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("from_value","to_value")
> 
> df.select(mappingExpr(cols, df)).show(false)

+------------------------------------+
|mapped                              |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+

Detailed explanation

The main idea of my function is to transform the list of columns to a list of tuples, where the first element of the tuple contains the column name as column, and the second element of the tuple contains the column value as column. Then I flatten this list of tuples and pass the result to the map spark SQL function

Let's now take your different constraints

The list of column names may contain 0 up to 3 column names

As I built the elements inserted in the map by iterating over the list of columns, the number of column's names does not change anything. If we pass an empty list of column's names, there is no error:

> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List()
>
> df.select(mappingExpr(List(), df)).show(false)
+------+
|mapped|
+------+
|[]    |
|[]    |
|[]    |
|[]    |
|[]    |
|[]    |
+------+

I need the keys in the map to preserve the order

This is the most tricky one. Usually when you create a map, the order is not preserved due to how a map is implemented. However in Spark it seems that the order is preserved so it only depends of list of column's names order. So in your example if we change column's names order:

> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("to_value","from_value")
> 
> df.select(mappingExpr(cols, df)).show(false)

+------------------------------------+
|mapped                              |
+------------------------------------+
|[to_value -> xyz1, from_value -> 66]|
|[to_value -> abc1, from_value -> 67]|
|[to_value -> fgr1, from_value -> 68]|
|[to_value -> yte1, from_value -> 69]|
|[to_value -> erx1, from_value -> 70]|
|[to_value -> ter1, from_value -> 71]|
+------------------------------------+

The column value can be null and that would need to be coalesce to an empty string

I do that in the inner function getValue, by using the when Spark's SQL function. So when the column value is null, return empty string otherwise return column value: when(col(columnName).isNull, lit("")).otherwise(col(columnName)). So when you have null values in your dataframe, it is replaced by empty string:

> val df = Seq((66, null,"a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("from_value","to_value")
> 
> df.select(mappingExpr(cols, df)).show(false)

+------------------------------------+
|mapped                              |
+------------------------------------+
|[from_value -> 66, to_value -> ]    |
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+

The column specified in the list may not exist in the dataframe

You can retrieve the list of columns of a dataframe by using the method columns. So I use this method to filter out all column's names that are not in dataframe with the line .filter(dataframe.columns.contain). So when the list of column's names contains a column name that is not in dataframe, it is ignored:

> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("a_column_that_does_not_exist", "from_value","to_value")
> 
> df.select(mappingExpr(cols, df)).show(false)

+------------------------------------+
|mapped                              |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+

Upvotes: 1

Related Questions