Reputation: 61
My JSON column names are a combination of lower and uppercase case (Ex: title/Title
and name/Name
), due to which in output, I am getting name
and Name
as two different columns (similarly title
and Title
).
How can I make the JSON columns as case insensitive?
config("spark.sql.caseSensitive", "true")
-> I tried this, but it is not working.
val df = Seq(
("A", "B", "{\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\"}"),
("C", "D", "{\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\"}"),
("E", "F", "{\"name\":\"pqr\",\"Address\":\"MNN\",\"Title\":\"bi\"}")
)).toDF("col_1", "col_2", "col_json")
import sc.implicits._
val col_schema = spark.read.json(df.select("col_json").as[String]).schema
val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema))
.select("col_1", "col_2", "new_col.*")
outputDF.show(false)
Expected/Needed output (column names to be case-insensitive):
Upvotes: 3
Views: 2682
Reputation: 32700
You can group the columns by their lowercase names and merge them using coalesce
function:
// set spark.sql.caseSensitive to true to avoid ambuigity
spark.conf.set("spark.sql.caseSensitive", "true")
val col_schema = spark.read.json(df.select("col_json").as[String]).schema
val df1 = df.withColumn("new_col", from_json(col("col_json"), col_schema))
.select("col_1", "col_2", "new_col.*")
val mergedCols = df1.columns.groupBy(_.toLowerCase).values
.map(grp =>
if (grp.size > 1) coalesce(grp.map(col): _*).as(grp(0))
else col(grp(0))
).toSeq
val outputDF = df1.select(mergedCols:_*)
outputDF.show()
//+----+-------+-----+-----+-----+
//|Name|Address|col_1|Title|col_2|
//+----+-------+-----+-----+-----+
//|xyz |NYC |A |engg |B |
//|mnp |MIC |C |data |D |
//|pqr |MNN |E |bi |F |
//+----+-------+-----+-----+-----+
Another way is to parse the JSON string column into MapType
instead of StructType
, and using transform_keys
you can lower case the column name, then explode the map and pivot to get columns:
import org.apache.spark.sql.types.{MapType, StringType}
val outputDF = df.withColumn(
"col_json",
from_json(col("col_json"), MapType(StringType, StringType))
).select(
col("col_1"),
col("col_2"),
explode(expr("transform_keys(col_json, (k, v) -> lower(k))"))
).groupBy("col_1", "col_2")
.pivot("key")
.agg(first("value"))
outputDF.show()
//+-----+-----+-------+----+-----+
//|col_1|col_2|address|name|title|
//+-----+-----+-------+----+-----+
//|E |F |MNN |pqr |bi |
//|C |D |MIC |mnp |data |
//|A |B |NYC |xyz |engg |
//+-----+-----+-------+----+-----+
For this solution transform_keys
is only avlaible since Spark 3, for older versions you can use UDF :
val mapKeysToLower = udf((m: Map[String, String]) => {
m.map { case (k, v) => k.toLowerCase -> v }
})
Upvotes: 2
Reputation: 87299
You will need to merge your columns, using something like:
import org.apache.spark.sql.functions.when
df = df.withColumn("title", when($"title".isNull, $"Title").otherwise($"title").drop("Title")
Upvotes: 0