Nats
Nats

Reputation: 191

How to merge map column in spark sql?

I have two Map type columns in a Dataframe. Is there a way I can create a new Map column merging these two columns in spark Sql using .withColumn ?

val sampleDF = Seq(
 ("Jeff", Map("key1" -> "val1"), Map("key2" -> "val2"))
).toDF("name", "mapCol1", "mapCol2")

sampleDF.show()

+----+-----------------+-----------------+
|name|          mapCol1|          mapCol2|
+----+-----------------+-----------------+
|Jeff|Map(key1 -> val1)|Map(key2 -> val2)|
+----+-----------------+-----------------+

Upvotes: 2

Views: 9261

Answers (3)

mrsrinivas
mrsrinivas

Reputation: 35404

Use UDF only if you do not have an inbuilt function for your use case due to performance reasons.

Spark version 2.4 and above

import org.apache.spark.sql.functions.{map_concat, col}

sampleDF.withColumn("map_concat", map_concat(col("mapCol1"), col("mapCol2"))).show(false)

Outputs

+----+-----------------+-----------------+-------------------------------+
|name|mapCol1          |mapCol2          |map_concat                     |
+----+-----------------+-----------------+-------------------------------+
|Jeff|Map(key1 -> val1)|Map(key2 -> val2)|Map(key1 -> val1, key2 -> val2)|
+----+-----------------+-----------------+-------------------------------+

Spark version 2.4 below

Create a UDF as per @RameshMaharjan answer in this question, but I added with a null check to avoid NPE at runtime which will fail the job eventually if not added.

import org.apache.spark.sql.functions.{udf, col}

val map_concat = udf((map1: Map[String, String],
                      map2: Map[String, String]) =>
  if (map1 == null) {
    map2
  } else if (map2 == null) {
    map1
  } else {
    map1 ++ map2
  })

sampleDF.withColumn("map_concat", map_concat(col("mapCol1"), col("mapCol2")))
 .show(false)

Upvotes: 5

Abhi
Abhi

Reputation: 6568

You can use struct to achieve this.

val sampleDF = Seq(
 ("Jeff", Map("key1" -> "val1"), Map("key2" -> "val2"))
).toDF("name", "mapCol1", "mapCol2")

sampleDF.show()

+----+-----------------+-----------------+
|name|          mapCol1|          mapCol2|
+----+-----------------+-----------------+
|Jeff|Map(key1 -> val1)|Map(key2 -> val2)|
+----+-----------------+-----------------+

sampleDF.withColumn("NewColumn",struct(sampleDF("mapCol1"), sampleDF("mapCol2"))).take(2)
    res17: Array[org.apache.spark.sql.Row] = Array([Jeff,Map(key1 -> val1),Map(key2 -> val2),[Map(key1 -> val1),Map(key2 -> val2)]])

+----+-----------------+-----------------+--------------------+
|name|          mapCol1|          mapCol2|           NewColumn|
+----+-----------------+-----------------+--------------------+
|Jeff|Map(key1 -> val1)|Map(key2 -> val2)|[Map(key1 -> val1...|
+----+-----------------+-----------------+--------------------+

Reference : How to merge two columns of a `Dataframe` in Spark into one 2-Tuple?

Upvotes: -3

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You can write a udf function to merge both column into one using withColumn as below

import org.apache.spark.sql.functions._
def mergeUdf = udf((map1: Map[String, String], map2: Map[String, String])=> map1 ++ map2)

sampleDF.withColumn("merged", mergeUdf(col("mapCol1"), col("mapCol2"))).show(false)

which should give you

+----+-----------------+-----------------+-------------------------------+
|name|mapCol1          |mapCol2          |merged                         |
+----+-----------------+-----------------+-------------------------------+
|Jeff|Map(key1 -> val1)|Map(key2 -> val2)|Map(key1 -> val1, key2 -> val2)|
+----+-----------------+-----------------+-------------------------------+

I hope the answer is helpful

Upvotes: 5

Related Questions