Reputation: 35
**Json Structure is -:**
aa.json
[[{"foo":"test1"},{"foo1":"test21"}],
[{"foo":"test2"},{"foo1":"test22"}],
[{"foo":"test3"},{"foo1":"test23"}]]
Code to read DataFrame:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
a=sqlContext.read.option('multiline',"true").json('aa.json');
a.show()
+----+----+
| foo|foo1|
+----+----+
|null|null|
+----+----+
a.printSchema()
root
|-- foo: string (nullable = true)
|-- foo1: string (nullable = true)
Here are the lines for reading this json, it can able to parse schema but not the data.
Upvotes: 2
Views: 1273
Reputation: 43534
Applying some regular expressions and converting to rdd
may work for you here.
First read the file using textFile
:
a=spark.read.option('multiline',"true").text('aa.json')
a.show(truncate=False)
#+-------------------------------------+
#|value |
#+-------------------------------------+
#|[[{"foo":"test1"},{"foo1":"test21"}],|
#|[{"foo":"test2"},{"foo1":"test22"}], |
#|[{"foo":"test3"},{"foo1":"test23"}]] |
#+-------------------------------------+
Now we can use pyspark.sql.functions.regexp_replace
to remove the extra square brackets and the trailing comma from each line:
from pyspark.sql.functions import regexp_replace
a = a.select(regexp_replace("value", "(^\[(?=\[))|((?<=\])\]$)|(,$)", "").alias("value"))
a.show(truncate=False)
#+-----------------------------------+
#|value |
#+-----------------------------------+
#|[{"foo":"test1"},{"foo1":"test21"}]|
#|[{"foo":"test2"},{"foo1":"test22"}]|
#|[{"foo":"test3"},{"foo1":"test23"}]|
#+-----------------------------------+
The pattern here a logical or of the following patterns:
^\[(?=\[)
: Start of string followed by [[
(the second [
is a non-capturing group)(?<=\])\]$
: ]]
at the end of the string (the first]
is a non-capturing group),$
: A comma at the end of the stringAny patterns that match are replaced by an empty string.
Now convert to rdd
and use json.loads
to parse your rows into lists of dictionaries. Then merge all of these dictionaries together into one dictionary and call the pyspark.sql.Row
constructor. Finally call .toDF
to convert back to a DataFrame.
# From `How to merge two dictionaries in a single expression?`
# This code works for python 2 and 3
def merge_two_dicts(x, y):
z = x.copy() # start with x's keys and values
z.update(y) # modifies z with y's keys and values & returns None
return z
import json
from pyspark.sql import Row
from functools import reduce
a.rdd.map(lambda x: Row(**reduce(merge_two_dicts, json.loads(x['value'])))).toDF().show()
#+-----+------+
#| foo| foo1|
#+-----+------+
#|test1|test21|
#|test2|test22|
#|test3|test23|
#+-----+------+
References:
Upvotes: 1