Reputation: 4059
The documentation of schema_of_json says:
Parameters:
json: Column or str
a JSON string or a foldable string column containing a JSON string.
But executing the following code where I provide a column raises an error:
import pyspark.sql.functions as f
from pyspark.shell import spark
df = spark.createDataFrame([
["""{"foo":"hello","bar":"world"}"""],
["""{"foo":"hello","bar":"world"}"""],
["""{"foo":"hello","bar":"world"}"""]
], ['json_column'])
df.printSchema()
# root
# |-- json_column: string (nullable = true)
df_schema = df.select('json_column', f.schema_of_json(f.col('json_column')).alias('schema'))
df_schema.show(truncate=False)
# pyspark.sql.utils.AnalysisException: cannot resolve 'schema_of_json(`json_column`)' due to data type mismatch:
# The input json should be a foldable string expression and not null; however, got `json_column`.;
The only way that I know to use this function is hard-coding a JSON object, but in a production scenario is useless because I can't parse dynamically the content column.
df_schema = df.select('json_column', f.schema_of_json('{"foo":"hello","bar":"world"}').alias('schema'))
df_schema.show(truncate=False)
# +-----------------------------+------------------------------------+
# |json_column |schema |
# +-----------------------------+------------------------------------+
# |{"foo":"hello","bar":"world"}|STRUCT<`bar`: STRING, `foo`: STRING>|
# |{"foo":"hello","bar":"world"}|STRUCT<`bar`: STRING, `foo`: STRING>|
# |{"foo":"hello","bar":"world"}|STRUCT<`bar`: STRING, `foo`: STRING>|
# +-----------------------------+------------------------------------+
Am I missing to do some step or the documentation isn't clear how this should be used?
Upvotes: 5
Views: 5277
Reputation: 26676
Lets try rdd the df, get schema and infer new schema in a read.json
s=df.select(col('json_column').alias('j')).rdd.map(lambda x: x.j)#convert json column into an rdd
s.collect()
['{"foo":"hello","bar":"world"}',
'{"foo":"hello","bar":"world"}',
'{"foo":"hello","bar":"world"}']
sc=spark.read.json(s).schema# read the rdd's schema
df1=df.select('*', from_json('json_column', schema=sc).alias('jsonread'))# change string to struct
df1.printSchema()
Upvotes: 2