Reputation: 2541
I have a pyspark dataframe consisting of one column, called json
, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.
# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
I've tried mapping over each row with json.loads
:
(df
.select('json')
.rdd
.map(lambda x: json.loads(x))
.toDF()
).show()
But this returns a TypeError: expected string or buffer
I suspect that part of the problem is that when converting from a dataframe
to an rdd
, the schema information is lost, so I've also tried manually entering in the schema info:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
But I get the same TypeError
.
Looking at this answer, it looks like flattening out the rows with flatMap
might be useful here, but I'm not having success with that either:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.flatMap(lambda x: x)
.flatMap(lambda x: json.loads(x))
.map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
I get this error: AttributeError: 'unicode' object has no attribute 'get'
.
Upvotes: 65
Views: 193593
Reputation: 11
This answer is for added context if your JSON strings are JSON Arrays instead of objects (I can't comment since I don't have rep). If you use Martin Tapp's solid answer it will return null values for your columns.
tl;dr
If your JSON strings are array objects like so:
[{"a":1, "b":1.0}]
spark.read.json
will return a dataframe that contains the schema of the elements in those arrays and not the include the array itself. from_json
isn't happy with this, so to be as specific as it wants you can wrap the schema inferred by spark.read.json
in an ArrayType
and it will properly parse (instead of returning null values for everything).
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType
array_item_schema = \
spark.read.json(df.rdd.map(lambda row: row['json_string_column'])).schema
json_array_schema = ArrayType(array_item_schema, True)
arrays_df = df.select(F.from_json('json_string_column', json_array_schema).alias('json_arrays'))
objects_df = arrays_df.select(F.explode('json_arrays').alias('objects'))
As an addendum to Nolan Conaway's, it seems that when your JSON is of the form
[
{
"a": 1.0,
"b": 1
},
{
"a": 0.0,
"b": 2
}
]
where the top level object is an array (and not an object), pyspark's spark.read.json()
treats the array as a collection of objects to be converted into rows instead of a single row.
See example run in PySpark 3.3.0 shell:
>>> myjson = """[{"a": 1.0,"b": 1},{"a": 2.0,"b": 2}]"""
>>> myotherjson = """[{"a": 3.0,"b": 3}]"""
>>> rawobjectjson = """{"a": 4.0,"b": 4}"""
>>> spark_read_df = spark.read.json(sc.parallelize([myjson,myotherjson,rawobjectjson]))
>>> spark_read_df.show()
+---+---+
| a| b|
+---+---+
|1.0| 1|
|2.0| 2|
|3.0| 3|
|4.0| 4|
+---+---+
>>> spark_read_df.printSchema()
root
|-- a: double (nullable = true)
|-- b: long (nullable = true)
We can see that myjson
and myotherjson
which were JSON arrays of JSON objects got expanded to have a row for each object they contained. It also smoothly handled when one of the JSON strings rawobjectjson
is just a raw object. I think the documentation falls a little short here, as I couldn't find mention of this handling for array objects.
Now let's create a dataframe with a column of JSON strings. Going to drop the rawobjectjson
because as we'll see from_json
requires each string to have the same schema (and this includes the top level array if present).
>>> from pyspark.sql.types import StructType, StructField, StringType, ArrayType
>>> json_string_data = [
... (myjson,),
... (myotherjson,),
... ]
>>> json_df_schema = StructType([
... StructField('json_strings', StringType(), True),
... ])
>>> raw_json_df = spark.createDataFrame(data=json_string_data, schema=json_df_schema)
>>> raw_json_df.show()
+--------------------+
| json_strings|
+--------------------+
|[{"a": 1.0,"b": 1...|
| [{"a": 3.0,"b": 3}]|
+--------------------+
Now here's where I tried to use the schema inferred by spark.read.json
to pass to from_json
to read the JSON column to objects, but it kept returning columns that were fully null
. As Nolan Conaway mentioned this will happen when the schema passed to from_json
couldn't be applied to the given strings.
The issue is that in these strings it sees the top level as an array, but as spark_read_df.printSchema()
shows, the schema inferred by spark.read.json()
ignores the array level.
So the solution I ended up going with was just accounting for the top level array in the schema when doing the read.
from pyspark.sql import functions as F
# This one won't work for directly passing to from_json as it ignores top-level arrays in json strings
# (if any)!
# json_object_schema = spark_read_df.schema()
# from_json is a bit more "simple", it directly applies the schema to the string. In this case
# the top level type is actually an array, so a simple fix is to just wrap the schema that
# spark.read.json returned in an ArrayType to match the true JSON string
json_array_schema = ArrayType(spark_read_df.schema, True)
json_extracted_df = raw_json_df.select(
F.from_json('json_strings', json_array_schema)
.alias('json_arrays')
)
>>> json_extracted_df.show()
+--------------------+
| json_arrays|
+--------------------+
|[{1.0, 1}, {2.0, 2}]|
| [{3.0, 3}]|
+--------------------+
>>> json_extracted_df.printSchema()
root
|-- json_arrays: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: double (nullable = true)
| | |-- b: long (nullable = true)
From there the objects can be pulled out of the array using pyspark.sql.functions.explode
:
>>> exploded_df = json_extracted_df.select(F.explode('json_arrays').alias('objects'))
>>> exploded_df.show()
+--------+
| objects|
+--------+
|{1.0, 1}|
|{2.0, 2}|
|{3.0, 3}|
+--------+
>>> exploded_df.printSchema()
root
|-- objects: struct (nullable = true)
| |-- a: double (nullable = true)
| |-- b: long (nullable = true)
Upvotes: 1
Reputation: 1369
If you don't know the schema of each JSON (and it can be different) you can use :
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# ... here you get your DF
# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))
Note that it won't keep any other column present in your dataset. From : https://github.com/apache/spark/pull/22775
Upvotes: 1
Reputation: 11
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def map2json(dict):
import json
return json.dumps(dict)
from pyspark.sql.types import StringType
spark.udf.register("map2json", lambda dict: map2json(dict), StringType())
spark.sql("select map2json(map('a', '1'))").show()
Upvotes: 1
Reputation: 3376
For Spark 2.1+, you can use from_json
which allows the preservation of the other non-json columns within the dataframe as follows:
from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))
You let Spark derive the schema of the json string column. Then the df.json
column is no longer a StringType, but the correctly decoded json structure, i.e., nested StrucType
and all the other columns of df
are preserved as-is.
You can access the json content as follows:
df.select(col('json.header').alias('header'))
Upvotes: 122
Reputation: 1305
Here's a concise (spark SQL) version of @nolan-conaway's parseJSONCols
function.
SELECT
explode(
from_json(
concat('{"data":',
'[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]',
'}'),
'data array<struct<a:DOUBLE, b:INT>>'
).data) as data;
PS. I've added the explode function as well :P
You'll need to know some HIVE SQL types
Upvotes: 6
Reputation: 2757
Existing answers do not work if your JSON is anything but perfectly/traditionally formatted. For example, the RDD-based schema inference expects JSON in curly-braces {}
and will provide an incorrect schema (resulting in null
values) if, for example, your data looks like:
[
{
"a": 1.0,
"b": 1
},
{
"a": 0.0,
"b": 2
}
]
I wrote a function to work around this issue by sanitizing JSON such that it lives in another JSON object:
def parseJSONCols(df, *cols, sanitize=True):
"""Auto infer the schema of a json column and parse into a struct.
rdd-based schema inference works if you have well-formatted JSON,
like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
can fix everything by wrapping the data in another JSON object
(``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
automatically performs the wrapping and unwrapping.
The schema inference is based on this
`SO Post <https://stackoverflow.com/a/45880574)/>`_.
Parameters
----------
df : pyspark dataframe
Dataframe containing the JSON cols.
*cols : string(s)
Names of the columns containing JSON.
sanitize : boolean
Flag indicating whether you'd like to sanitize your records
by wrapping and unwrapping them in another JSON object layer.
Returns
-------
pyspark dataframe
A dataframe with the decoded columns.
"""
res = df
for i in cols:
# sanitize if requested.
if sanitize:
res = (
res.withColumn(
i,
psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
)
)
# infer schema and apply it
schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
res = res.withColumn(i, psf.from_json(psf.col(i), schema))
# unpack the wrapped object if needed
if sanitize:
res = res.withColumn(i, psf.col(i).data)
return res
Note: psf
= pyspark.sql.functions
.
Upvotes: 28
Reputation: 13926
Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)
For example:
>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
|-- body: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
| |-- sub_json: struct (nullable = true)
| | |-- id: long (nullable = true)
| | |-- sub_sub_json: struct (nullable = true)
| | | |-- col1: long (nullable = true)
| | | |-- col2: string (nullable = true)
|-- header: struct (nullable = true)
| |-- foo: string (nullable = true)
| |-- id: long (nullable = true)
Upvotes: 52