R7777777
R7777777

Reputation: 37

I need to create a spark dataframe from a nested json file in scala

I have a Json file that looks like this

{
  "tags": [
    {
      "1": "NpProgressBarTag",
      "2": "userPath",
      "3": "screen",
      "4": 6,
      "12": 9,
      "13": "buttonName",
      "16": 0,
      "17": 10,
      "18": 5,
      "19": 6,
      "20": 1,
      "35": 1,
      "36": 1,
      "37": 4,
      "38": 0,
      "39": "npChannelGuid",
      "40": "npShowGuid",
      "41": "npCategoryGuid",
      "42": "npEpisodeGuid",
      "43": "npAodEpisodeGuid",
      "44": "npVodEpisodeGuid",
      "45": "npLiveEventGuid",
      "46": "npTeamGuid",
      "47": "npLeagueGuid",
      "48": "npStatus",
      "50": 0,
      "52": "gupId",
      "54": "deviceID",
      "55": 1,
      "56": 0,
      "57": "uiVersion",
      "58": 1,
      "59": "deviceOS",
      "60": 1,
      "61": 0,
      "62": "channelLineupID",
      "63": 2,
      "64": "userProfile",
      "65": "sessionId",
      "66": "hitId",
      "67": "actionTime",
      "68": "seekTo",
      "69": "seekFrom",
      "70": "currentPosition"
    }
  ]
}

I tried to create a dataframe using

val path = "some/path/to/jsonFile.json"
val df = sqlContext.read.json(path)
df.show()

when I run this I get

df: org.apache.spark.sql.DataFrame = [_corrupt_record: string]

How do we create a df based on contents of "tags" key? all I need is, pull data out of "tags" and apply case class like this

case class ProgLang (id: String, type: String )

I need to convert this json data into dataframe with Two Column names .toDF(id, Type) Can anyone shed some light on this error?

Upvotes: 0

Views: 1373

Answers (3)

vaquar khan
vaquar khan

Reputation: 11479

try following code if your json file not very big

    val spark = SparkSession.builder().getOrCreate()
    val df = spark.read.json(spark.sparkContext.wholeTextFiles("some/path/to/jsonFile.json").values)

Upvotes: 0

Kishore
Kishore

Reputation: 5891

val path = "some/path/to/jsonFile.json"
spark.read
  .option("multiLine", true).option("mode", "PERMISSIVE")
  .json(path)

Upvotes: 0

You may modify the JSON using Circe.

Given that your values are sometimes Strings and other times Numbers, this was quite complex.

import io.circe._, io.circe.parser._, io.circe.generic.semiauto._

val json = """ ... """ // your JSON here.
val doc = parse(json).right.get
val mappedDoc = doc.hcursor.downField("tags").withFocus { array =>
  array.mapArray { jsons =>
    jsons.map { json =>
      json.mapObject { o =>
        o.mapValues { v =>
          // Cast numbers to strings.
          if (v.isString) v else Json.fromString(v.asNumber.get.toString)
        }
      }
    }
  }
}

final case class ProgLang(id: String, `type`: String )
final case class Tags(tags: List[Map[String, String]])
implicit val TagsDecoder: Decoder[Tags] = deriveDecoder

val tags = mappedDoc.top.get.as[Tags]
val data = for {
  tag <- res29.tags
  (id, _type) <- tag
} yield ProgLang(id, _type)

Now you have a List of ProgLang you may create a DataFrame directly from it, save it as a file with each JSON per line, save it as a CSV file, etc...
If the file is very big, you may use fs2 to stream it while transforming, it integrates nicely with Circe.


DISCLAIMER: I am far from being a "pro" with Circe, this seems over-complicated for doing something which seems like a "simple-task", probably there is a better / cleaner way of doing it (maybe using Optics?), but hey! it works! - anyways, if anyone knows a better way to solve this feel free to edit the question or provide yours.

Upvotes: 1

Related Questions