Rakshith
Rakshith

Reputation: 664

Merge multiple individual entries to single entry in Spark Dataframe

Assume I have a partition which looks like this

part1:

{"customerId":"1","name":"a"}
{"customerId":"2","name":"b"}

Assume I would like to change the Schema of this to Something like

{"data":"customers":[{"customerId":"1","name":"a"},{"customerId":"2","name":"b"}]}

what I tried doing was

case class Customer(customerId:Option[String],name:Option[String])
case class Customers(customers:Option[Seq[Customer]])
case class Datum(data:Option[Customers])

I tried reading the partition as Json and converting to Dataframe.

val inputJson = spark.read.format("json").load("part1")
inputJson.as[Datum]

Somehow the Dataframe doesnt seem to implicitly infer the schema.

Upvotes: 3

Views: 363

Answers (2)

abiratsis
abiratsis

Reputation: 7316

By having this structure I believe you are hiding/wrapping the really useful information of your data. The only useful information here is: {"customerId":"1","name":"a"},{"customerId":"2","name":"b"} customers along with datum will just hide the data that you really need. In order to access the data right now you must 1st slightly change your data to:

{"customers":[{"customerId":"1","name":"a"},{"customerId":"2","name":"b"}]}

And then access this JSON with the next code:

case class Customer(customerId:String, name:String)
case class Data(customers: Array[Customer])

val df = spark.read.json(path).as[Data]

If try to print this dataframe you get:

+----------------+
|       customers|
+----------------+
|[[1, a], [2, b]]|
+----------------+

which of course is your data wrapped into arrays. Now comes the interesting part, in order to access this you must do something as the following:

df.foreach{ data => data.customers.foreach(println _) }

This will print:

Customer(1,a)
Customer(2,b)

which is the real data that you need but not easily accessed at all.

EDIT:

Instead of using 2 classes I would use just one, the Customer class. Then leverage the build-in Spark filters for selecting inner JSON objects. Finally you can explode each array of customers and generate from the exploded column a strongly type dataset of class Customer.

Here is the final code:

case class Customer(customerId:String, name:String)

val path = "C:\\temp\\json_data.json"
val df = spark.read.json(path)

df.select(explode($"data.customers"))
  .map{ r => Customer(r.getStruct(0).getString(0), r.getStruct(0).getString(1))}
  .show(false)

And the output:

+----------+----+
|customerId|name|
+----------+----+
|1         |a   |
|2         |b   |
+----------+----+

Upvotes: 2

Rakshith
Rakshith

Reputation: 664

I ended up manipulating the dataframe itself

val inputJson = spark.read.format("json").load("part1")

val formatted = inputJson.withColumn("dummy",lit(1)).groupBy("dummy")
.agg(collect_list(struct(dataFrame.col("*"))).alias("customers"))

val finalFormatted=formatted.withColumn("data",struct(col("customers")))
.select("data")

Now when i do

finalFormatted.printSchema

I get the schema that i need

  |-- data: struct (nullable = false)
  |    |-- customers: array (nullable = true)
  |    |    |-- element: struct (containsNull = true)
  |    |    |    |-- customerId: string (nullable = true)
  |    |    |    |-- name: string (nullable = true)

Upvotes: 0

Related Questions