Iram Fatma
Iram Fatma

Reputation: 1

Reading a nested JSON file where the value of structType column is string in pyspark

{ "traffic_fource": "{'name': 'intgreints', 'medium': '(none)', 'source': '(direct)'}" }

This is a parquet file which is having data in json format but value part is in double quotes which makes it a string rather than StructType, I want to unnest it have '_' in between the columns like traffic_fource_name and value will be intgreints and then traffic_fource_medium and the value will be (none).I already have a function which unnest it as i want only if it recognizes the datatype is structType or ArrayType but here I am having the value in double quotes . So I have done the following steps to recognise this as struct type:

        schema_traffic = StructType([StructField("name",StringType()),StructField("medium",StringType()),StructField("source",StringType())])
        df = df.withColumn("traffic_source", from_json(inp_df.traffic_source, schema_traffic))

This is working fine. But the problem which I am facing is when I am trying to pass the schema through a parameter file. Schema is always in string format where from_json fails.After reading the schema from param file it looks like this :

"StructType([StructField("name",StringType()),StructField("medium",StringType()),StructField("source",StringType())])"

Upvotes: 0

Views: 2067

Answers (2)

Mohana B C
Mohana B C

Reputation: 5487

When you read the files, content will be inferred as string only.

You can have config file with content as shown below. Add column name and corresponding type:

name:string
medium:string
source:string

Read this file and dynamically create StructType schema. I used simple file reader to read and parse configs, you can use any other config parser libraries if you want.

type_to_spark = {"int": IntegerType(), "string": StringType()}

with open("config.conf") as f:
    contents = f.read().splitlines()

def prepare_struct(config_lines):
    configs = config_lines.split(":")
    return StructField(configs[0], type_to_spark[configs[1]])

schema = StructType(list(map(lambda s: prepare_struct(s), contents)))

# Now pass this schema to from_json

UPDATE

You can pass schema in the form of DDL string to from_json. Example code:

>>> df = spark.createDataFrame([('{"id": 1,"name": {"fn": "jk","ln": "jh"}}',)]).toDF("json")
>>> df.show(truncate=False)
+-----------------------------------------+
|json                                     |
+-----------------------------------------+
|{"id": 1,"name": {"fn": "jk","ln": "jh"}}|
+-----------------------------------------+

>>> outDF = df.select(from_json("json","id int, name struct<fn:string,ln:string>"))
>>> outDF.show()
+---------------+
|from_json(json)|
+---------------+
|  {1, {jk, jh}}|
+---------------+

>>> outDF.printSchema()
root
 |-- from_json(json): struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- name: struct (nullable = true)
 |    |    |-- fn: string (nullable = true)
 |    |    |-- ln: string (nullable = true)

You can place your ddl schema in a file then read the file, store the schema in a variable and pass the schema variable to from_json.

Upvotes: 0

Iram Fatma
Iram Fatma

Reputation: 1

Awesome . I do have columns which are nested so if i again have to use StructType then how i can do that : "FireAnalyse": "[{'value': 'firebase_previous_screen', 'key': {'string_value': 'NewUser', 'int_value': 'None', 'float_value': 'None', 'double_value': 'None'}}, {'value': 'firebase_previous_class', 'key': {'string_value': 'UserDetails', 'int_value': 'None', 'float_value': None, 'double_value': None}}]"

So First i have removed the array brackets: inp_df = inp_df.withColumn("FireAnalyse", func.split(inp_df['FireAnalyse'], ']')[0]) inp_df = inp_df.withColumn("FireAnalyse", func.split(inp_df['FireAnalyse'], '[')[1])

        schema = StructType([StructField("key",StringType()),StructField("value",StructType([StructField("string_value",StringType()),StructField("int_value",IntegerType()),StructField("float_value",FloatType()),StructField("double_value",DoubleType())]))])
        inp_df = inp_df.withColumn("event_params", from_json(inp_df.event_params, schema))

Upvotes: 0

Related Questions