Ninja
Ninja

Reputation: 115

How to read the json file in spark using scala?

I want to read the JSON file in the below format:-

 {
  "titlename": "periodic",
    "atom": [
         {
          "usage": "neutron",
          "dailydata": [
    {
      "utcacquisitiontime": "2017-03-27T22:00:00Z",
      "datatimezone": "+02:00",
      "intervalvalue": 28128,
      "intervaltime": 15          
    },
    {
      "utcacquisitiontime": "2017-03-27T22:15:00Z",
      "datatimezone": "+02:00",
      "intervalvalue": 25687,
      "intervaltime": 15          
    }
   ]
  }
 ]
}

I am writing my read line as:

sqlContext.read.json("user/files_fold/testing-data.json").printSchema

But I not getting the desired result-

root                                                                            
  |-- _corrupt_record: string (nullable = true)

Please help me on this

Upvotes: 3

Views: 22303

Answers (6)

chandra prakash kabra
chandra prakash kabra

Reputation: 341

You just need to add this statement with your read statement. It happens because your json is multiline option("multiLine", true).

spark.read.option("multiLine", true).option("mode", "PERMISSIVE")  .json("/path/to/user.json")

Upvotes: 0

Sankar
Sankar

Reputation: 586

This has already been answered nicely by other contributors, but I had one question which is how do i access each nested value/unit of the dataframe.

So, for collections, we can use explode and for struct types we can directly call the unit by dot(.).

scala> val a = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json("file:///home/hdfs/spark_2.json")
a: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string]

scala> a.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)


scala> val b = a.withColumn("exploded_atom", explode(col("atom")))
b: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 1 more field]

scala> b.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
 |-- exploded_atom: struct (nullable = true)
 |    |-- dailydata: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |-- usage: string (nullable = true)


scala>

scala> val c = b.withColumn("exploded_atom_struct", explode(col("`exploded_atom`.dailydata")))
c: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 2 more fields]

scala>

scala> c.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
 |-- exploded_atom: struct (nullable = true)
 |    |-- dailydata: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |-- usage: string (nullable = true)
 |-- exploded_atom_struct: struct (nullable = true)
 |    |-- datatimezone: string (nullable = true)
 |    |-- intervaltime: long (nullable = true)
 |    |-- intervalvalue: long (nullable = true)
 |    |-- utcacquisitiontime: string (nullable = true)


scala> val d = c.withColumn("exploded_atom_struct_last", col("`exploded_atom_struct`.utcacquisitiontime"))
d: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 3 more fields]


scala> d.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
 |-- exploded_atom: struct (nullable = true)
 |    |-- dailydata: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |-- usage: string (nullable = true)
 |-- exploded_atom_struct: struct (nullable = true)
 |    |-- datatimezone: string (nullable = true)
 |    |-- intervaltime: long (nullable = true)
 |    |-- intervalvalue: long (nullable = true)
 |    |-- utcacquisitiontime: string (nullable = true)
 |-- exploded_atom_struct_last: string (nullable = true)


scala> val d = c.select(col("titlename"), col("exploded_atom_struct.*"))
d: org.apache.spark.sql.DataFrame = [titlename: string, datatimezone: string ... 3 more fields]

scala> d.show
+---------+------------+------------+-------------+--------------------+
|titlename|datatimezone|intervaltime|intervalvalue|  utcacquisitiontime|
+---------+------------+------------+-------------+--------------------+
| periodic|      +02:00|          15|        28128|2017-03-27T22:00:00Z|
| periodic|      +02:00|          15|        25687|2017-03-27T22:15:00Z|
+---------+------------+------------+-------------+--------------------+

So thought of posting it here, in case if anyone has similar questions seeing this question.

Upvotes: 1

Harshad_Pardeshi
Harshad_Pardeshi

Reputation: 97

Spark 2.2 introduced multiLine option which can be used to load JSON (not JSONL) files:

spark.read
.option("multiLine", true).option("mode", "PERMISSIVE")
  .json("/path/to/user.json")

Upvotes: 3

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

I suggest using wholeTextFiles to read the file and apply some functions to convert it to a single-line JSON format.

val json = sc.wholeTextFiles("/user/files_fold/testing-data.json").
  map(tuple => tuple._2.replace("\n", "").trim)

val df = sqlContext.read.json(json)

You should have the final valid dataframe as

+--------------------------------------------------------------------------------------------------------+---------+
|atom                                                                                                    |titlename|
+--------------------------------------------------------------------------------------------------------+---------+
|[[WrappedArray([+02:00,15,28128,2017-03-27T22:00:00Z], [+02:00,15,25687,2017-03-27T22:15:00Z]),neutron]]|periodic |
+--------------------------------------------------------------------------------------------------------+---------+

And valid schema as

root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)

Upvotes: 5

philantrovert
philantrovert

Reputation: 10082

From the Apache Spark SQL Docs

Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object.

Thus,

{ "titlename": "periodic","atom": [{ "usage": "neutron", "dailydata": [ {"utcacquisitiontime": "2017-03-27T22:00:00Z","datatimezone": "+02:00","intervalvalue": 28128,"intervaltime":15},{"utcacquisitiontime": "2017-03-27T22:15:00Z","datatimezone": "+02:00", "intervalvalue": 25687,"intervaltime": 15 }]}]}

And then:

val jsonDF = sqlContext.read.json("file")
jsonDF: org.apache.spark.sql.DataFrame = 
[atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, 
titlename: string]

Upvotes: 0

Andrei T.
Andrei T.

Reputation: 2480

It probably has something to do with the JSON object stored inside your file, could you print it or make sure it's the one you provided in the question? I'm asking because I took that one and it runs just fine:

val json =
  """
    |{
    |  "titlename": "periodic",
    |  "atom": [
    |    {
    |      "usage": "neutron",
    |      "dailydata": [
    |        {
    |          "utcacquisitiontime": "2017-03-27T22:00:00Z",
    |          "datatimezone": "+02:00",
    |          "intervalvalue": 28128,
    |          "intervaltime": 15
    |        },
    |        {
    |          "utcacquisitiontime": "2017-03-27T22:15:00Z",
    |          "datatimezone": "+02:00",
    |          "intervalvalue": 25687,
    |          "intervaltime": 15
    |        }
    |      ]
    |    }
    |  ]
    |}
  """.stripMargin

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.read
  .json(spark.sparkContext.parallelize(Seq(json)))
  .printSchema()

Upvotes: 0

Related Questions