Reputation: 115
I want to read the JSON file in the below format:-
{
"titlename": "periodic",
"atom": [
{
"usage": "neutron",
"dailydata": [
{
"utcacquisitiontime": "2017-03-27T22:00:00Z",
"datatimezone": "+02:00",
"intervalvalue": 28128,
"intervaltime": 15
},
{
"utcacquisitiontime": "2017-03-27T22:15:00Z",
"datatimezone": "+02:00",
"intervalvalue": 25687,
"intervaltime": 15
}
]
}
]
}
I am writing my read line as:
sqlContext.read.json("user/files_fold/testing-data.json").printSchema
But I not getting the desired result-
root
|-- _corrupt_record: string (nullable = true)
Please help me on this
Upvotes: 3
Views: 22303
Reputation: 341
You just need to add this statement with your read statement. It happens because your json is multiline option("multiLine", true).
spark.read.option("multiLine", true).option("mode", "PERMISSIVE") .json("/path/to/user.json")
Upvotes: 0
Reputation: 586
This has already been answered nicely by other contributors, but I had one question which is how do i access each nested value/unit of the dataframe.
So, for collections, we can use explode and for struct types we can directly call the unit by dot(.)
.
scala> val a = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json("file:///home/hdfs/spark_2.json")
a: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string]
scala> a.printSchema
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
scala> val b = a.withColumn("exploded_atom", explode(col("atom")))
b: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 1 more field]
scala> b.printSchema
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
|-- exploded_atom: struct (nullable = true)
| |-- dailydata: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- datatimezone: string (nullable = true)
| | | |-- intervaltime: long (nullable = true)
| | | |-- intervalvalue: long (nullable = true)
| | | |-- utcacquisitiontime: string (nullable = true)
| |-- usage: string (nullable = true)
scala>
scala> val c = b.withColumn("exploded_atom_struct", explode(col("`exploded_atom`.dailydata")))
c: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 2 more fields]
scala>
scala> c.printSchema
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
|-- exploded_atom: struct (nullable = true)
| |-- dailydata: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- datatimezone: string (nullable = true)
| | | |-- intervaltime: long (nullable = true)
| | | |-- intervalvalue: long (nullable = true)
| | | |-- utcacquisitiontime: string (nullable = true)
| |-- usage: string (nullable = true)
|-- exploded_atom_struct: struct (nullable = true)
| |-- datatimezone: string (nullable = true)
| |-- intervaltime: long (nullable = true)
| |-- intervalvalue: long (nullable = true)
| |-- utcacquisitiontime: string (nullable = true)
scala> val d = c.withColumn("exploded_atom_struct_last", col("`exploded_atom_struct`.utcacquisitiontime"))
d: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 3 more fields]
scala> d.printSchema
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
|-- exploded_atom: struct (nullable = true)
| |-- dailydata: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- datatimezone: string (nullable = true)
| | | |-- intervaltime: long (nullable = true)
| | | |-- intervalvalue: long (nullable = true)
| | | |-- utcacquisitiontime: string (nullable = true)
| |-- usage: string (nullable = true)
|-- exploded_atom_struct: struct (nullable = true)
| |-- datatimezone: string (nullable = true)
| |-- intervaltime: long (nullable = true)
| |-- intervalvalue: long (nullable = true)
| |-- utcacquisitiontime: string (nullable = true)
|-- exploded_atom_struct_last: string (nullable = true)
scala> val d = c.select(col("titlename"), col("exploded_atom_struct.*"))
d: org.apache.spark.sql.DataFrame = [titlename: string, datatimezone: string ... 3 more fields]
scala> d.show
+---------+------------+------------+-------------+--------------------+
|titlename|datatimezone|intervaltime|intervalvalue| utcacquisitiontime|
+---------+------------+------------+-------------+--------------------+
| periodic| +02:00| 15| 28128|2017-03-27T22:00:00Z|
| periodic| +02:00| 15| 25687|2017-03-27T22:15:00Z|
+---------+------------+------------+-------------+--------------------+
So thought of posting it here, in case if anyone has similar questions seeing this question.
Upvotes: 1
Reputation: 97
Spark 2.2 introduced multiLine option which can be used to load JSON (not JSONL) files:
spark.read
.option("multiLine", true).option("mode", "PERMISSIVE")
.json("/path/to/user.json")
Upvotes: 3
Reputation: 41957
I suggest using wholeTextFiles
to read the file and apply some functions to convert it to a single-line JSON format.
val json = sc.wholeTextFiles("/user/files_fold/testing-data.json").
map(tuple => tuple._2.replace("\n", "").trim)
val df = sqlContext.read.json(json)
You should have the final valid dataframe
as
+--------------------------------------------------------------------------------------------------------+---------+
|atom |titlename|
+--------------------------------------------------------------------------------------------------------+---------+
|[[WrappedArray([+02:00,15,28128,2017-03-27T22:00:00Z], [+02:00,15,25687,2017-03-27T22:15:00Z]),neutron]]|periodic |
+--------------------------------------------------------------------------------------------------------+---------+
And valid schema
as
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
Upvotes: 5
Reputation: 10082
From the Apache Spark SQL Docs
Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object.
Thus,
{ "titlename": "periodic","atom": [{ "usage": "neutron", "dailydata": [ {"utcacquisitiontime": "2017-03-27T22:00:00Z","datatimezone": "+02:00","intervalvalue": 28128,"intervaltime":15},{"utcacquisitiontime": "2017-03-27T22:15:00Z","datatimezone": "+02:00", "intervalvalue": 25687,"intervaltime": 15 }]}]}
And then:
val jsonDF = sqlContext.read.json("file")
jsonDF: org.apache.spark.sql.DataFrame =
[atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>,
titlename: string]
Upvotes: 0
Reputation: 2480
It probably has something to do with the JSON object stored inside your file, could you print it or make sure it's the one you provided in the question? I'm asking because I took that one and it runs just fine:
val json =
"""
|{
| "titlename": "periodic",
| "atom": [
| {
| "usage": "neutron",
| "dailydata": [
| {
| "utcacquisitiontime": "2017-03-27T22:00:00Z",
| "datatimezone": "+02:00",
| "intervalvalue": 28128,
| "intervaltime": 15
| },
| {
| "utcacquisitiontime": "2017-03-27T22:15:00Z",
| "datatimezone": "+02:00",
| "intervalvalue": 25687,
| "intervaltime": 15
| }
| ]
| }
| ]
|}
""".stripMargin
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.read
.json(spark.sparkContext.parallelize(Seq(json)))
.printSchema()
Upvotes: 0