AnumNuma
AnumNuma

Reputation: 61

Merge json column names with case in-sensitive

My JSON column names are a combination of lower and uppercase case (Ex: title/Title and name/Name), due to which in output, I am getting name and Name as two different columns (similarly title and Title).

How can I make the JSON columns as case insensitive?

config("spark.sql.caseSensitive", "true") -> I tried this, but it is not working.

val df = Seq(
  ("A", "B", "{\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\"}"),
  ("C", "D", "{\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\"}"),
  ("E", "F", "{\"name\":\"pqr\",\"Address\":\"MNN\",\"Title\":\"bi\"}")
)).toDF("col_1", "col_2", "col_json")

import sc.implicits._
val col_schema = spark.read.json(df.select("col_json").as[String]).schema

val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema))
       .select("col_1", "col_2", "new_col.*")

outputDF.show(false)

Current output:
enter image description here

Expected/Needed output (column names to be case-insensitive):
enter image description here

Upvotes: 3

Views: 2682

Answers (2)

blackbishop
blackbishop

Reputation: 32700

Soltion 1

You can group the columns by their lowercase names and merge them using coalesce function:

// set spark.sql.caseSensitive to true to avoid ambuigity
spark.conf.set("spark.sql.caseSensitive", "true")

val col_schema = spark.read.json(df.select("col_json").as[String]).schema

val df1 = df.withColumn("new_col", from_json(col("col_json"), col_schema))
     .select("col_1", "col_2", "new_col.*")


val mergedCols = df1.columns.groupBy(_.toLowerCase).values
  .map(grp =>
    if (grp.size > 1) coalesce(grp.map(col): _*).as(grp(0))
    else col(grp(0))
  ).toSeq

val outputDF = df1.select(mergedCols:_*)

outputDF.show()
//+----+-------+-----+-----+-----+
//|Name|Address|col_1|Title|col_2|
//+----+-------+-----+-----+-----+
//|xyz |NYC    |A    |engg |B    |
//|mnp |MIC    |C    |data |D    |
//|pqr |MNN    |E    |bi   |F    |
//+----+-------+-----+-----+-----+

Solution 2

Another way is to parse the JSON string column into MapType instead of StructType, and using transform_keys you can lower case the column name, then explode the map and pivot to get columns:

import org.apache.spark.sql.types.{MapType, StringType}

val outputDF = df.withColumn(
    "col_json",
    from_json(col("col_json"), MapType(StringType, StringType))
  ).select(
    col("col_1"),
    col("col_2"),
    explode(expr("transform_keys(col_json, (k, v) -> lower(k))"))
  ).groupBy("col_1", "col_2")
  .pivot("key")
  .agg(first("value"))

outputDF.show()
//+-----+-----+-------+----+-----+
//|col_1|col_2|address|name|title|
//+-----+-----+-------+----+-----+
//|E    |F    |MNN    |pqr |bi   |
//|C    |D    |MIC    |mnp |data |
//|A    |B    |NYC    |xyz |engg |
//+-----+-----+-------+----+-----+

For this solution transform_keys is only avlaible since Spark 3, for older versions you can use UDF :

val mapKeysToLower = udf((m: Map[String, String]) => {
  m.map { case (k, v) => k.toLowerCase -> v }
})

Upvotes: 2

Alex Ott
Alex Ott

Reputation: 87299

You will need to merge your columns, using something like:

import org.apache.spark.sql.functions.when
df = df.withColumn("title", when($"title".isNull, $"Title").otherwise($"title").drop("Title")

Upvotes: 0

Related Questions