Udit Mittal
Udit Mittal

Reputation: 35

How to read multilevel json in pyspark?

**Json Structure is -:**
aa.json

[[{"foo":"test1"},{"foo1":"test21"}],
[{"foo":"test2"},{"foo1":"test22"}],
[{"foo":"test3"},{"foo1":"test23"}]]

Code to read DataFrame:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

a=sqlContext.read.option('multiline',"true").json('aa.json');
a.show()
+----+----+
| foo|foo1|
+----+----+
|null|null|
+----+----+

a.printSchema()
root
 |-- foo: string (nullable = true)
 |-- foo1: string (nullable = true)

Here are the lines for reading this json, it can able to parse schema but not the data.

Upvotes: 2

Views: 1273

Answers (1)

pault
pault

Reputation: 43534

Applying some regular expressions and converting to rdd may work for you here.

First read the file using textFile:

a=spark.read.option('multiline',"true").text('aa.json')
a.show(truncate=False)
#+-------------------------------------+
#|value                                |
#+-------------------------------------+
#|[[{"foo":"test1"},{"foo1":"test21"}],|
#|[{"foo":"test2"},{"foo1":"test22"}], |
#|[{"foo":"test3"},{"foo1":"test23"}]] |
#+-------------------------------------+

Now we can use pyspark.sql.functions.regexp_replace to remove the extra square brackets and the trailing comma from each line:

from pyspark.sql.functions import regexp_replace
a = a.select(regexp_replace("value", "(^\[(?=\[))|((?<=\])\]$)|(,$)", "").alias("value"))
a.show(truncate=False)
#+-----------------------------------+
#|value                              |
#+-----------------------------------+
#|[{"foo":"test1"},{"foo1":"test21"}]|
#|[{"foo":"test2"},{"foo1":"test22"}]|
#|[{"foo":"test3"},{"foo1":"test23"}]|
#+-----------------------------------+

The pattern here a logical or of the following patterns:

  • ^\[(?=\[): Start of string followed by [[ (the second [ is a non-capturing group)
  • (?<=\])\]$: ]] at the end of the string (the first] is a non-capturing group)
  • ,$: A comma at the end of the string

Any patterns that match are replaced by an empty string.

Now convert to rdd and use json.loads to parse your rows into lists of dictionaries. Then merge all of these dictionaries together into one dictionary and call the pyspark.sql.Row constructor. Finally call .toDF to convert back to a DataFrame.

# From `How to merge two dictionaries in a single expression?`
# This code works for python 2 and 3
def merge_two_dicts(x, y):
    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z

import json
from pyspark.sql import Row
from functools import reduce 

a.rdd.map(lambda x: Row(**reduce(merge_two_dicts, json.loads(x['value'])))).toDF().show()
#+-----+------+
#|  foo|  foo1|
#+-----+------+
#|test1|test21|
#|test2|test22|
#|test3|test23|
#+-----+------+

References:

Upvotes: 1

Related Questions