Reputation: 554
I have a dataframe that looks like this:
------------------------------------------------------------------------
|name |meals |
------------------------------------------------------------------------
|Tom |{"breakfast": "banana", "lunch": "sandwich"} |
|Alex |{"breakfast": "yogurt", "lunch": "pizza", "dinner": "pasta"} |
|Lisa |{"lunch": "sushi", "dinner": "lasagna", "snack": "apple"} |
------------------------------------------------------------------------
Obtained from the following:
var rawDf = Seq(("Tom",s"""{"breakfast": "banana", "lunch": "sandwich"}""" ),
("Alex", s"""{"breakfast": "yogurt", "lunch": "pizza", "dinner": "pasta"}"""),
("Lisa", s"""{"lunch": "sushi", "dinner": "lasagna", "snack": "apple"}""")).toDF("name", "meals")
I want to transform it into a dataframe that looks like this:
------------------------------------------------------------------------
|name |meal |food |
------------------------------------------------------------------------
|Tom |breakfast | banana |
|Tom |lunch | sandwich |
|Alex |breakfast | yogurt |
|Alex |lunch | pizza |
|Alex |dinner | pasta |
|Lisa |lunch | sushi |
|Lisa |dinner | lasagna |
|Lisa |snack | apple |
------------------------------------------------------------------------
I'm using Spark 2.1, so I'm parsing the json using get_json_object. Currently, I'm trying to get the final dataframe using an intermediary dataframe that looks like this:
------------------------------------------------------------------------
|name |breakfast |lunch |dinner |snack |
------------------------------------------------------------------------
|Tom |banana |sandwich |null |null |
|Alex |yogurt |pizza |pasta |null |
|Lisa |null |sushi |lasagna |apple |
------------------------------------------------------------------------
Obtained from the following:
val intermediaryDF = rawDf.select(col("name"),
get_json_object(col("meals"), "$." + Meals.breakfast).alias(Meals.breakfast),
get_json_object(col("meals"), "$." + Meals.lunch).alias(Meals.lunch),
get_json_object(col("meals"), "$." + Meals.dinner).alias(Meals.dinner),
get_json_object(col("meals"), "$." + Meals.snack).alias(Meals.snack))
Meals
is defined in another file that has a lot more entries than breakfast
, lunch
, dinner
, and snack
, but it looks something like this:
object Meals {
val breakfast = "breakfast"
val lunch = "lunch"
val dinner = "dinner"
val snack = "snack"
}
I then use intermediaryDF
to compute the final DataFrame, like so:
val finalDF = parsedDF.where(col("breakfast").isNotNull).select(col("name"), col("breakfast")).union(
parsedDF.where(col("lunch").isNotNull).select(col("name"), col("lunch"))).union(
parsedDF.where(col("dinner").isNotNull).select(col("name"), col("dinner"))).union(
parsedDF.where(col("snack").isNotNull).select(col("name"), col("snack")))
Using the intermediary DataFrame works if I only have a few types of Meals
, but I actually have 40, and enumerating every one of them to compute intermediaryDF
is impractical. I also don't like the idea of having to compute this DF in the first place. Is there a way to get directly from my raw dataframe to the final dataframe without the intermediary step, and also without explicitly having a case for every value in Meals
?
Upvotes: 0
Views: 363
Reputation: 2108
Apache Spark provide support to parse json data, but that should have a predefined schema in order to parse it correclty. Your json data is dynamic so you cannot rely on a schema.
One way to do don;t let apache spark parse the data , but you could parse it in a key value way, (e.g by using something like Map[String, String]
which is pretty generic)
Here is what you can do instead:
Use the Jackson json mapper for scala
// mapper object created on each executor node
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
val valueAsMap = mapper.readValue[Map[String, String]](s"""{"breakfast": "banana", "lunch": "sandwich"}""")
This will give you something like transforming the json string into a Map[String, String]. That can also be viewed as a List of (key, value) pair
List((breakfast,banana), (lunch,sandwich))
Now comes the Apache Spark part into the play. Define a custom user defined function to parse the string and output the List of (key, value) pairs
val jsonToArray = udf((json:String) => {
mapper.readValue[Map[String, String]](json).toList
})
Apply that transformation on the "meals" columns and will transform that into a column of type Array. After that explode
on that columns and select the key entry as column meal
and value entry as column food
val df1 = rowDf.select(col("name"), explode(jsonToArray(col("meals"))).as("meals"))
df1.select(col("name"), col("meals._1").as("meal"), col("meals._2").as("food"))
Showing the last dataframe it outputs:
|name| meal| food|
+----+---------+--------+
| Tom|breakfast| banana|
| Tom| lunch|sandwich|
|Alex|breakfast| yogurt|
|Alex| lunch| pizza|
|Alex| dinner| pasta|
|Lisa| lunch| sushi|
|Lisa| dinner| lasagna|
|Lisa| snack| apple|
+----+---------+--------+
Upvotes: 1