Reputation: 547
I have an RDD that has been created from some JSON, each record in the RDD contains key/value pairs. My RDD looks like:
myRdd.foreach(println)
{"sequence":89,"id":8697344444103393,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527636408955},1],
{"sequence":153,"id":8697389197662617,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527637852762},1],
{"sequence":155,"id":8697389381205360,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527637858607},1],
{"sequence":136,"id":8697374208897843,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527637405129},1],
{"sequence":189,"id":8697413135394406,"trackingInfo":{"row":0,"trackId":14272744,"requestId":"284929d9-6147-4924-a19f-4a308730354c-3348447","rank":0,"videoId":80075830,"location":"PostPlay\/Next"},"type":["Play","Action","Session"],"time":527638558756},1],
{"sequence":130,"id":8697373887446384,"trackingInfo":{"location":"Browse","row":0,"trackId":14170286,"listId":"cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585","videoId":80000778,"rank":0,"requestId":"ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171"},"type":["Play","Action","Session"],"time":527637394083}]
I would to convert each record to a row in a spark dataframe, the nested fields in trackingInfo should be there own columns and the type
list should be its own column also.
So far I've tired to split it using a case class :
case class Event(
sequence: String,
id: String,
trackingInfo:String,
location:String,
row:String,
trackId: String,
listrequestId: String,
videoId:String,
rank: String,
requestId: String,
`type`:String,
time: String)
val dataframeRdd = myRdd.map(line => line.split(",")).
map(array => Event(
array(0).split(":")(1),
array(1).split(":")(1),
array(2).split(":")(1),
array(3).split(":")(1),
array(4).split(":")(1),
array(5).split(":")(1),
array(6).split(":")(1),
array(7).split(":")(1),
array(8).split(":")(1),
array(9).split(":")(1),
array(10).split(":")(1),
array(11).split(":")(1)
))
However I keep getting java.lang.ArrayIndexOutOfBoundsException: 1
errors.
What is the best way to do this ? As you can see record number 5 has a slight difference in the ordering of some attributes. Is it possible to parse based on attribute names instead of splitting on "," etc.
I'm using Spark 1.6.x
Upvotes: 2
Views: 4412
Reputation: 41957
Your json rdd
seems to be invalid jsons
. You need to convert them to valid jsons
as
val validJsonRdd = myRdd.map(x => x.replace(",1],", ",").replace("}]", "}"))
then you can use the sqlContext
to read the valid rdd
jsons
into a dataframe
as
val df = sqlContext.read.json(validJsonRdd)
which should give you dataframe ( i used the invalid json you provided in the question)
+----------------+--------+------------+-----------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|id |sequence|time |trackingInfo |type |
+----------------+--------+------------+-----------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|8697344444103393|89 |527636408955|[cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585,Browse,0,ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171,0,14170286,80000778]|[Play, Action, Session]|
|8697389197662617|153 |527637852762|[cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585,Browse,0,ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171,0,14170286,80000778]|[Play, Action, Session]|
|8697389381205360|155 |527637858607|[cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585,Browse,0,ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171,0,14170286,80000778]|[Play, Action, Session]|
|8697374208897843|136 |527637405129|[cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585,Browse,0,ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171,0,14170286,80000778]|[Play, Action, Session]|
|8697413135394406|189 |527638558756|[null,PostPlay/Next,0,284929d9-6147-4924-a19f-4a308730354c-3348447,0,14272744,80075830] |[Play, Action, Session]|
|8697373887446384|130 |527637394083|[cd7c2c7a-00f6-4035-867f-d1dd7d89972d_6625365X3XX1505943605585,Browse,0,ac12f4e1-5644-46af-87d1-ec3b92ce4896-4071171,0,14170286,80000778]|[Play, Action, Session]|
+----------------+--------+------------+-----------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
and the schema for the dataframe is
root
|-- id: long (nullable = true)
|-- sequence: long (nullable = true)
|-- time: long (nullable = true)
|-- trackingInfo: struct (nullable = true)
| |-- listId: string (nullable = true)
| |-- location: string (nullable = true)
| |-- rank: long (nullable = true)
| |-- requestId: string (nullable = true)
| |-- row: long (nullable = true)
| |-- trackId: long (nullable = true)
| |-- videoId: long (nullable = true)
|-- type: array (nullable = true)
| |-- element: string (containsNull = true)
I hope the answer is helpful
Upvotes: 6
Reputation: 47
You can use sqlContext.read.json(myRDD.map(_._2)) to read json into a dataframe
Upvotes: 0