Reputation: 1739
I'm trying to create a JSON structure from a pyspark dataframe. I have below columns in my dataframe - batch_id, batch_run_id, table_name, column_name, column_datatype, last_refresh_time, refresh_frequency, owner
I want it in below JSON structure -
{
"GeneralInfo": {
"DataSetID": "xxxx1234Abcsd",
"Owner" : ["[email protected]", "[email protected]", "[email protected]"]
"Description": "",
"BuisnessFunction": "",
"Source": "",
"RefreshRate": "Weekly",
"LastUpdate": "2020/10/15",
"InfoSource": "TemplateInfo"
},
"Tables": [
{
"TableName": "Employee",
"Columns" : [
{ "ColumnName" : "EmployeeID",
"ColumnDataType": "int"
},
{ "ColumnName" : "EmployeeName",
"ColumnDataType": "string"
}
]
}
}
}
I'm trying to assign the values in JSON string through dataframe column indexes but it is giving me an error as "Object of Type Column is not JSON serializable". I have used like below -
{
"GeneralInfo": {
"DataSetID": df["batch_id"],
"Owner" : list(df["owner"])
"Description": "",
"BuisnessFunction": "",
"Source": "",
"RefreshRate": df["refresh_frequency"],
"LastUpdate": df["last_update_time"],
"InfoSource": "TemplateInfo"
},
"Tables": [
{
"TableName": df["table_name"],
"Columns" : [
{ "ColumnName" : df["table_name"]["column_name"],
"ColumnDataType": df["table_name"]["column_datatype"]
}
]
}
}
}
Please help me on this, I have newly started coding in Pyspark.
Upvotes: 2
Views: 3934
Reputation: 5487
Tried getting JSON format from the sample data which you provided, output format is not matching exactly as you expected. You can improvise the below code further.
We can use toJSON function to convert dataframe to JSON format. Before calling toJSON function we need to use array(), struct functions by passing required columns to match JSON format as required.
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession.builder.master('local[*]').getOrCreate()
in_values = [
(123, '123abc', 'Employee', 'Employee_id', 'int', '21/05/15', 'Weekly',
['[email protected]', '[email protected]', '[email protected]']),
(123, '123abc', 'Employee', 'Employee_name', 'string', '21/05/15', 'Weekly',
['[email protected]', '[email protected]', '[email protected]'])
]
cols = ["batch_id", "batch_run_id", "table_name", "column_name", "column_datatype",
"last_update_time", "refresh_frequency", "Owner"]
df = spark.createDataFrame(in_values).toDF(*cols)\
.selectExpr("*","'' Description", "'' BusinessFunction", "'TemplateInfo' InfoSource", "'' Source")
list1 = [df["batch_id"].alias("DataSetID"), df["Owner"], df["refresh_frequency"].alias("RefreshRate"),
df["last_update_time"].alias("LastUpdate"), "Description", "BusinessFunction","InfoSource", "Source"]
list2 = [df["table_name"].alias("TableName"),df["column_name"].alias("ColumnName"),
df["column_datatype"].alias("ColumnDataType")]
df.groupBy("batch_id") \
.agg(collect_set(struct(*list1))[0].alias("GeneralInfo"),
collect_list(struct(*list2)).alias("Tables")).drop("batch_id") \
.toJSON().foreach(print)
# outputs JSON --->
'''
{
"GeneralInfo":{
"DataSetID":123,
"Owner":[
"[email protected]",
"[email protected]",
"[email protected]"
],
"RefreshRate":"Weekly",
"LastUpdate":"21/05/15",
"Description":"",
"BusinessFunction":"",
"InfoSource":"TemplateInfo",
"Source":""
},
"Tables":[
{
"TableName":"Employee",
"ColumnName":"Employee_id",
"ColumnDataType":"int"
},
{
"TableName":"Employee",
"ColumnName":"Employee_name",
"ColumnDataType":"string"
}
]
}
'''
Upvotes: 4