Reputation: 664
Assume I have a partition which looks like this
part1:
{"customerId":"1","name":"a"}
{"customerId":"2","name":"b"}
Assume I would like to change the Schema of this to Something like
{"data":"customers":[{"customerId":"1","name":"a"},{"customerId":"2","name":"b"}]}
what I tried doing was
case class Customer(customerId:Option[String],name:Option[String])
case class Customers(customers:Option[Seq[Customer]])
case class Datum(data:Option[Customers])
I tried reading the partition as Json and converting to Dataframe.
val inputJson = spark.read.format("json").load("part1")
inputJson.as[Datum]
Somehow the Dataframe doesnt seem to implicitly infer the schema.
Upvotes: 3
Views: 363
Reputation: 7316
By having this structure I believe you are hiding/wrapping the really useful information of your data. The only useful information here is: {"customerId":"1","name":"a"},{"customerId":"2","name":"b"}
customers along with datum will just hide the data that you really need. In order to access the data right now you must 1st slightly change your data to:
{"customers":[{"customerId":"1","name":"a"},{"customerId":"2","name":"b"}]}
And then access this JSON with the next code:
case class Customer(customerId:String, name:String)
case class Data(customers: Array[Customer])
val df = spark.read.json(path).as[Data]
If try to print this dataframe you get:
+----------------+
| customers|
+----------------+
|[[1, a], [2, b]]|
+----------------+
which of course is your data wrapped into arrays. Now comes the interesting part, in order to access this you must do something as the following:
df.foreach{ data => data.customers.foreach(println _) }
This will print:
Customer(1,a)
Customer(2,b)
which is the real data that you need but not easily accessed at all.
EDIT:
Instead of using 2 classes I would use just one, the Customer class. Then leverage the build-in Spark filters for selecting inner JSON objects. Finally you can explode each array of customers and generate from the exploded column a strongly type dataset of class Customer.
Here is the final code:
case class Customer(customerId:String, name:String)
val path = "C:\\temp\\json_data.json"
val df = spark.read.json(path)
df.select(explode($"data.customers"))
.map{ r => Customer(r.getStruct(0).getString(0), r.getStruct(0).getString(1))}
.show(false)
And the output:
+----------+----+
|customerId|name|
+----------+----+
|1 |a |
|2 |b |
+----------+----+
Upvotes: 2
Reputation: 664
I ended up manipulating the dataframe itself
val inputJson = spark.read.format("json").load("part1")
val formatted = inputJson.withColumn("dummy",lit(1)).groupBy("dummy")
.agg(collect_list(struct(dataFrame.col("*"))).alias("customers"))
val finalFormatted=formatted.withColumn("data",struct(col("customers")))
.select("data")
Now when i do
finalFormatted.printSchema
I get the schema that i need
|-- data: struct (nullable = false)
| |-- customers: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- customerId: string (nullable = true)
| | | |-- name: string (nullable = true)
Upvotes: 0