Reputation: 1
{ "traffic_fource": "{'name': 'intgreints', 'medium': '(none)', 'source': '(direct)'}" }
This is a parquet file which is having data in json format but value part is in double quotes which makes it a string rather than StructType, I want to unnest it have '_' in between the columns like traffic_fource_name and value will be intgreints and then traffic_fource_medium and the value will be (none).I already have a function which unnest it as i want only if it recognizes the datatype is structType or ArrayType but here I am having the value in double quotes . So I have done the following steps to recognise this as struct type:
schema_traffic = StructType([StructField("name",StringType()),StructField("medium",StringType()),StructField("source",StringType())])
df = df.withColumn("traffic_source", from_json(inp_df.traffic_source, schema_traffic))
This is working fine. But the problem which I am facing is when I am trying to pass the schema through a parameter file. Schema is always in string format where from_json fails.After reading the schema from param file it looks like this :
"StructType([StructField("name",StringType()),StructField("medium",StringType()),StructField("source",StringType())])"
Upvotes: 0
Views: 2067
Reputation: 5487
When you read the files, content will be inferred as string only.
You can have config file with content as shown below. Add column name and corresponding type:
name:string
medium:string
source:string
Read this file and dynamically create StructType schema. I used simple file reader to read and parse configs, you can use any other config parser libraries if you want.
type_to_spark = {"int": IntegerType(), "string": StringType()}
with open("config.conf") as f:
contents = f.read().splitlines()
def prepare_struct(config_lines):
configs = config_lines.split(":")
return StructField(configs[0], type_to_spark[configs[1]])
schema = StructType(list(map(lambda s: prepare_struct(s), contents)))
# Now pass this schema to from_json
You can pass schema in the form of DDL string to from_json. Example code:
>>> df = spark.createDataFrame([('{"id": 1,"name": {"fn": "jk","ln": "jh"}}',)]).toDF("json")
>>> df.show(truncate=False)
+-----------------------------------------+
|json |
+-----------------------------------------+
|{"id": 1,"name": {"fn": "jk","ln": "jh"}}|
+-----------------------------------------+
>>> outDF = df.select(from_json("json","id int, name struct<fn:string,ln:string>"))
>>> outDF.show()
+---------------+
|from_json(json)|
+---------------+
| {1, {jk, jh}}|
+---------------+
>>> outDF.printSchema()
root
|-- from_json(json): struct (nullable = true)
| |-- id: integer (nullable = true)
| |-- name: struct (nullable = true)
| | |-- fn: string (nullable = true)
| | |-- ln: string (nullable = true)
You can place your ddl schema in a file then read the file, store the schema in a variable and pass the schema variable to from_json.
Upvotes: 0
Reputation: 1
Awesome . I do have columns which are nested so if i again have to use StructType then how i can do that : "FireAnalyse": "[{'value': 'firebase_previous_screen', 'key': {'string_value': 'NewUser', 'int_value': 'None', 'float_value': 'None', 'double_value': 'None'}}, {'value': 'firebase_previous_class', 'key': {'string_value': 'UserDetails', 'int_value': 'None', 'float_value': None, 'double_value': None}}]"
So First i have removed the array brackets: inp_df = inp_df.withColumn("FireAnalyse", func.split(inp_df['FireAnalyse'], ']')[0]) inp_df = inp_df.withColumn("FireAnalyse", func.split(inp_df['FireAnalyse'], '[')[1])
schema = StructType([StructField("key",StringType()),StructField("value",StructType([StructField("string_value",StringType()),StructField("int_value",IntegerType()),StructField("float_value",FloatType()),StructField("double_value",DoubleType())]))])
inp_df = inp_df.withColumn("event_params", from_json(inp_df.event_params, schema))
Upvotes: 0