Reputation: 516
I would really love some help with parsing nested JSON data using PySpark-SQL. The data has the following schema (blank spaces are edits for confidentiality purposes...)
Schema
root
|-- location_info: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- restaurant_type: string (nullable = true)
| | |
| | |
| | |-- other_data: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- other_data_1 string (nullable = true)
| | | | |-- other_data_2: string (nullable = true)
| | | | |-- other_data_3: string (nullable = true)
| | | | |-- other_data_4: string (nullable = true)
| | | | |-- other_data_5: string (nullable = true)
| | |
| | |-- latitude: string (nullable = true)
| | |
| | |
| | |
| | |
| | |
| | |-- longitude: string (nullable = true)
| | |
| | |
| | |
| | |-- timezone: string (nullable = true)
|-- restaurant_id: string (nullable = true)
My Goal I would essentially want to get the data into the following data frame
restaurant_id | latitude | longtitude | timezone
I have tried
The following code
dfj = spark.read.option("multiLine", False).json("/file/path")
result = dfj.select(col('restaurant_id'),
explode(col('location_info')).alias('location_info') )
# SQL operation
result.createOrReplaceTempView('result')
subset_data = spark.sql(
'''
SELECT restaurant_id, location_info.latitude,location_info.longitude,location_info.timestamp
FROM result
'''
).show()
# Also tried this to read in
source_df_1 = spark.read.json(sc.wholeTextFiles("/file/path")
.values()
.flatMap(lambda x: x
.replace("{", "#!#")
.split("#!#")))
But oddly enough it gives me the following only for the first object or restaurant id
+-------+-----------+------------+--------------------+
|restaurant_id|latitude|longitude|timestamp|
+-------+-----------+------------+--------------------+
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
+-------+-----------+------------+--------------------+
My research indicated that this may have something to do with the way JSON files are structured at the source. For example:
{}{
}{
}
Thereby not being multi-Line or something. Wondering what to do about this as well?
Thank you very much for reading, any help would really be appreciated. I know I can always count on SO to be helpful
Upvotes: 2
Views: 10821
Reputation: 1126
The spark.read.json()
reader assumes one json object per text line. I'm not sure I follow the insertion of the \n
and then the split... sounds like maybe the file is malformed?
Perhaps there is a record separator such as a \r which you can't see. The linux command od -c <file name> |head -10
will help show what the characters are in between records.
If the schema is well known, then supply that schema object, this will reduce the first pass which does schema inferencing. E.g.
schema.read.schema(schema).json('path to directory')
and definitely make your read operation much faster. Save the objects as parquet or delta lake format for better performance you need to query it later.
Databricks' COPY INTO
or cloudFiles
format will speed the ingestion/reduce the latency. https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html
Upvotes: 1
Reputation: 516
I was able to solve this by reading the JSON file I've described above as follows, hope it helps! :
# Reading multiple files in the dir
source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*")
.values()
.flatMap(lambda x: x
.replace('{"restaurant_id','\n{"restaurant_id' ).split('\n')))
# explode here to have restaurant_id, and nested data
exploded_source_df_1 = source_df_1.select(col('restaurant_id'),
explode(col('location_info')).alias('location_info') )
# Via SQL operation : this will solve the problem for parsing
exploded_source_df_1.createOrReplaceTempView('result_1')
subset_data_1 = spark.sql(
'''
SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timestamp
from result_1
'''
).persist()
Upvotes: 1