Reputation: 37
I have a Json file that looks like this
{
"tags": [
{
"1": "NpProgressBarTag",
"2": "userPath",
"3": "screen",
"4": 6,
"12": 9,
"13": "buttonName",
"16": 0,
"17": 10,
"18": 5,
"19": 6,
"20": 1,
"35": 1,
"36": 1,
"37": 4,
"38": 0,
"39": "npChannelGuid",
"40": "npShowGuid",
"41": "npCategoryGuid",
"42": "npEpisodeGuid",
"43": "npAodEpisodeGuid",
"44": "npVodEpisodeGuid",
"45": "npLiveEventGuid",
"46": "npTeamGuid",
"47": "npLeagueGuid",
"48": "npStatus",
"50": 0,
"52": "gupId",
"54": "deviceID",
"55": 1,
"56": 0,
"57": "uiVersion",
"58": 1,
"59": "deviceOS",
"60": 1,
"61": 0,
"62": "channelLineupID",
"63": 2,
"64": "userProfile",
"65": "sessionId",
"66": "hitId",
"67": "actionTime",
"68": "seekTo",
"69": "seekFrom",
"70": "currentPosition"
}
]
}
I tried to create a dataframe using
val path = "some/path/to/jsonFile.json"
val df = sqlContext.read.json(path)
df.show()
when I run this I get
df: org.apache.spark.sql.DataFrame = [_corrupt_record: string]
How do we create a df based on contents of "tags" key? all I need is, pull data out of "tags" and apply case class like this
case class ProgLang (id: String, type: String )
I need to convert this json data into dataframe with Two Column names .toDF(id, Type) Can anyone shed some light on this error?
Upvotes: 0
Views: 1373
Reputation: 11479
try following code if your json file not very big
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.json(spark.sparkContext.wholeTextFiles("some/path/to/jsonFile.json").values)
Upvotes: 0
Reputation: 5891
val path = "some/path/to/jsonFile.json"
spark.read
.option("multiLine", true).option("mode", "PERMISSIVE")
.json(path)
Upvotes: 0
Reputation: 22895
You may modify the JSON using Circe.
Given that your values are sometimes Strings and other times Numbers, this was quite complex.
import io.circe._, io.circe.parser._, io.circe.generic.semiauto._
val json = """ ... """ // your JSON here.
val doc = parse(json).right.get
val mappedDoc = doc.hcursor.downField("tags").withFocus { array =>
array.mapArray { jsons =>
jsons.map { json =>
json.mapObject { o =>
o.mapValues { v =>
// Cast numbers to strings.
if (v.isString) v else Json.fromString(v.asNumber.get.toString)
}
}
}
}
}
final case class ProgLang(id: String, `type`: String )
final case class Tags(tags: List[Map[String, String]])
implicit val TagsDecoder: Decoder[Tags] = deriveDecoder
val tags = mappedDoc.top.get.as[Tags]
val data = for {
tag <- res29.tags
(id, _type) <- tag
} yield ProgLang(id, _type)
Now you have a List of ProgLang
you may create a DataFrame
directly from it, save it as a file with each JSON per line, save it as a CSV file, etc...
If the file is very big, you may use fs2 to stream it while transforming, it integrates nicely with Circe.
DISCLAIMER: I am far from being a "pro" with Circe, this seems over-complicated for doing something which seems like a "simple-task", probably there is a better / cleaner way of doing it (maybe using Optics?), but hey! it works! - anyways, if anyone knows a better way to solve this feel free to edit the question or provide yours.
Upvotes: 1