Maria Livia
Maria Livia

Reputation: 75

Spark - Drop null values from map column

I'm using Spark to read a CSV file and then gather all the fields to create a map. Some of the fields are empty and I'd like to remove them from the map.

So for a CSV that looks like this:

"animal", "colour", "age"
"cat"   , "black" ,
"dog"   ,         , "3"

I'd like to get a dataset with the following maps:

Map("animal" -> "cat", "colour" -> "black")
Map("animal" -> "dog", "age" -> "3")

This is what I have so far:

val csv_cols_n_vals: Array[Column] = csv.columns.flatMap { c => Array(lit(c), col(c)) }

sparkSession.read
    .option("header", "true")
    .csv(csvLocation)
    .withColumn("allFieldsMap", map(csv_cols_n_vals: _*))

I've tried a few variations, but I can't seem to find the correct solution.

Upvotes: 0

Views: 2844

Answers (5)

stack0114106
stack0114106

Reputation: 8711

Using map_filter() function, from Spark 3.0 onwards

val df = Seq(("cat", "black", null), ("dog", null, "3")).toDF("animal", "colour", "age")

df.show(false)

+------+------+----+
|animal|colour|age |
+------+------+----+
|cat   |black |null|
|dog   |null  |3   |
+------+------+----+

val arr_map = df.columns.map( c => map(lit(c), col(c)) ).reduce(map_concat(_,_))

val df2 = df.withColumn("map_all",arr_map)

df2.createOrReplaceTempView("a_vw")

spark.sql("select *, map_filter(map_all, (k,v) -> v is not null) fmap from a_vw ").show(false)

+------+------+----+---------------------------------------------+--------------------------------+
|animal|colour|age |map_all                                      |fmap                            |
+------+------+----+---------------------------------------------+--------------------------------+
|cat   |black |null|{animal -> cat, colour -> black, age -> null}|{animal -> cat, colour -> black}|
|dog   |null  |3   |{animal -> dog, colour -> null, age -> 3}    |{animal -> dog, age -> 3}       |
+------+------+----+---------------------------------------------+--------------------------------+

Upvotes: 0

stack0114106
stack0114106

Reputation: 8711

Using the foldleft()

val df = Seq(("cat", "black", null), ("dog", null, "3")).toDF("animal", "colour", "age")

df.show(false)

+------+------+----+
|animal|colour|age |
+------+------+----+
|cat   |black |null|
|dog   |null  |3   |
+------+------+----+


df.withColumn("allFieldsMap",df.columns.foldLeft(map()){
  (acc,x) => when(col(x).isNotNull,map_concat(acc,map(lit(x),col(x)))).otherwise(acc)
    }
).show(false)

+------+------+----+--------------------------------+
|animal|colour|age |allFieldsMap                    |
+------+------+----+--------------------------------+
|cat   |black |null|{animal -> cat, colour -> black}|
|dog   |null  |3   |{animal -> dog, age -> 3}       |
+------+------+----+--------------------------------+

Upvotes: 0

stack0114106
stack0114106

Reputation: 8711

Spark-sql solution:

val df = Seq(("cat", "black", null), ("dog", null, "3")).toDF("animal", "colour", "age")

df.show(false)

+------+------+----+
|animal|colour|age |
+------+------+----+
|cat   |black |null|
|dog   |null  |3   |
+------+------+----+

df.createOrReplaceTempView("a_vw")
val cols_str = df.columns.flatMap( x => Array("\"".concat(x).concat("\""),x)).mkString(",")

spark.sql(s""" 
select collect_list(m2) res from (
select id, key, value, map(key,value) m2 from (
select id, explode(m) as (key,value) from 
    ( select monotonically_increasing_id() id, map(${cols_str}) m from a_vw )
    )
 where value is not null
) group by id
""")
.show(false)

+------------------------------------+
|res                                 |
+------------------------------------+
|[[animal -> cat], [colour -> black]]|
|[[animal -> dog], [age -> 3]]       |
+------------------------------------+

Or much shorter

spark.sql(s""" 
select collect_list(case when value is not null then map(key,value) end ) res from (
select id, explode(m) as (key,value) from 
    ( select monotonically_increasing_id() id, map(${cols_str}) m from a_vw )
) group by id
""")
.show(false)

+------------------------------------+
|res                                 |
+------------------------------------+
|[[animal -> cat], [colour -> black]]|
|[[animal -> dog], [age -> 3]]       |
+------------------------------------+

Upvotes: 0

s.polam
s.polam

Reputation: 10362

scala> df.show(false)
+------+------+----+
|animal|colour|age |
+------+------+----+
|cat   |black |null|
|dog   |null  |3   |
+------+------+----+

Building Expressions

val colExpr = df
.columns // getting list of columns from dataframe.
.map{ columnName =>
    when(
        col(columnName).isNotNull, // checking if column is not null
        map(
            lit(columnName),
            col(columnName)
        ) // Adding column name and its value inside map
    )
    .otherwise(map())
}
.reduce(map_concat(_,_)) 
// finally using map_concat function to concat map values.

Above code will create below expressions.

map_concat(
    map_concat(
        CASE WHEN (animal IS NOT NULL) THEN map(animal, animal) ELSE map() END, 
        CASE WHEN (colour IS NOT NULL) THEN map(colour, colour) ELSE map() END
    ), 
        CASE WHEN (age IS NOT NULL) THEN map(age, age) ELSE map() END
)

Applying colExpr on DataFrame.

scala> 

df
.withColumn("allFieldsMap",colExpr)
.show(false)

+------+------+----+--------------------------------+
|animal|colour|age |allFieldsMap                    |
+------+------+----+--------------------------------+
|cat   |black |null|[animal -> cat, colour -> black]|
|dog   |null  |3   |[animal -> dog, age -> 3]       |
+------+------+----+--------------------------------+

Upvotes: 1

Travis Hegner
Travis Hegner

Reputation: 2495

There is most certainly a better and more efficient way using the Dataframe API, but here is a map/flatmap solution:

val df = Seq(("cat", "black", null), ("dog", null, "3")).toDF("animal", "colour", "age")
val cols = df.columns

df.map(r => {
   cols.flatMap( c => {
       val v = r.getAs[String](c)
       if (v != null) {
           Some(Map(c -> v))
       } else {
           None
       }
   }).reduce(_ ++ _)
}).toDF("map").show(false)

Which produces:

+--------------------------------+
|map                             |
+--------------------------------+
|[animal -> cat, colour -> black]|
|[animal -> dog, age -> 3]       |
+--------------------------------+

Upvotes: 2

Related Questions