\n
Please help me on this, I have newly started coding in Pyspark.
\n","author":{"@type":"Person","name":"Dipanjan Mallick"},"upvoteCount":2,"answerCount":1,"acceptedAnswer":{"@type":"Answer","text":"Tried getting JSON format from the sample data which you provided, output format is not matching exactly as you expected. You can improvise the below code further.
\nWe can use toJSON function to convert dataframe to JSON format. Before calling toJSON function we need to use array(), struct functions by passing required columns to match JSON format as required.
\nfrom pyspark.sql import *\nfrom pyspark.sql.functions import *\n\nspark = SparkSession.builder.master('local[*]').getOrCreate()\n\nin_values = [\n (123, '123abc', 'Employee', 'Employee_id', 'int', '21/05/15', 'Weekly',\n ['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com']),\n (123, '123abc', 'Employee', 'Employee_name', 'string', '21/05/15', 'Weekly',\n ['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com'])\n]\n\ncols = ["batch_id", "batch_run_id", "table_name", "column_name", "column_datatype",\n "last_update_time", "refresh_frequency", "Owner"]\n\n\ndf = spark.createDataFrame(in_values).toDF(*cols)\\\n .selectExpr("*","'' Description", "'' BusinessFunction", "'TemplateInfo' InfoSource", "'' Source")\n\nlist1 = [df["batch_id"].alias("DataSetID"), df["Owner"], df["refresh_frequency"].alias("RefreshRate"),\n df["last_update_time"].alias("LastUpdate"), "Description", "BusinessFunction","InfoSource", "Source"]\n\nlist2 = [df["table_name"].alias("TableName"),df["column_name"].alias("ColumnName"),\n df["column_datatype"].alias("ColumnDataType")]\n\ndf.groupBy("batch_id") \\\n .agg(collect_set(struct(*list1))[0].alias("GeneralInfo"),\n collect_list(struct(*list2)).alias("Tables")).drop("batch_id") \\\n .toJSON().foreach(print)\n\n# outputs JSON --->\n '''\n {\n "GeneralInfo":{\n "DataSetID":123,\n "Owner":[\n "test1@gmail.com",\n "test1@gmail.com",\n "test3@gmail.com"\n ],\n "RefreshRate":"Weekly",\n "LastUpdate":"21/05/15",\n "Description":"",\n "BusinessFunction":"",\n "InfoSource":"TemplateInfo",\n "Source":""\n },\n "Tables":[\n {\n "TableName":"Employee",\n "ColumnName":"Employee_id",\n "ColumnDataType":"int"\n },\n {\n "TableName":"Employee",\n "ColumnName":"Employee_name",\n "ColumnDataType":"string"\n }\n ]\n}\n'''\n \n
\n","author":{"@type":"Person","name":"Mohana B C"},"upvoteCount":4}}}Reputation: 1739
I'm trying to create a JSON structure from a pyspark dataframe. I have below columns in my dataframe - batch_id, batch_run_id, table_name, column_name, column_datatype, last_refresh_time, refresh_frequency, owner
I want it in below JSON structure -
{
"GeneralInfo": {
"DataSetID": "xxxx1234Abcsd",
"Owner" : ["test1@email.com", "test2@email.com", "test3@email.com"]
"Description": "",
"BuisnessFunction": "",
"Source": "",
"RefreshRate": "Weekly",
"LastUpdate": "2020/10/15",
"InfoSource": "TemplateInfo"
},
"Tables": [
{
"TableName": "Employee",
"Columns" : [
{ "ColumnName" : "EmployeeID",
"ColumnDataType": "int"
},
{ "ColumnName" : "EmployeeName",
"ColumnDataType": "string"
}
]
}
}
}
I'm trying to assign the values in JSON string through dataframe column indexes but it is giving me an error as "Object of Type Column is not JSON serializable". I have used like below -
{
"GeneralInfo": {
"DataSetID": df["batch_id"],
"Owner" : list(df["owner"])
"Description": "",
"BuisnessFunction": "",
"Source": "",
"RefreshRate": df["refresh_frequency"],
"LastUpdate": df["last_update_time"],
"InfoSource": "TemplateInfo"
},
"Tables": [
{
"TableName": df["table_name"],
"Columns" : [
{ "ColumnName" : df["table_name"]["column_name"],
"ColumnDataType": df["table_name"]["column_datatype"]
}
]
}
}
}
Please help me on this, I have newly started coding in Pyspark.
Upvotes: 2
Views: 3942
Reputation: 5487
Tried getting JSON format from the sample data which you provided, output format is not matching exactly as you expected. You can improvise the below code further.
We can use toJSON function to convert dataframe to JSON format. Before calling toJSON function we need to use array(), struct functions by passing required columns to match JSON format as required.
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession.builder.master('local[*]').getOrCreate()
in_values = [
(123, '123abc', 'Employee', 'Employee_id', 'int', '21/05/15', 'Weekly',
['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com']),
(123, '123abc', 'Employee', 'Employee_name', 'string', '21/05/15', 'Weekly',
['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com'])
]
cols = ["batch_id", "batch_run_id", "table_name", "column_name", "column_datatype",
"last_update_time", "refresh_frequency", "Owner"]
df = spark.createDataFrame(in_values).toDF(*cols)\
.selectExpr("*","'' Description", "'' BusinessFunction", "'TemplateInfo' InfoSource", "'' Source")
list1 = [df["batch_id"].alias("DataSetID"), df["Owner"], df["refresh_frequency"].alias("RefreshRate"),
df["last_update_time"].alias("LastUpdate"), "Description", "BusinessFunction","InfoSource", "Source"]
list2 = [df["table_name"].alias("TableName"),df["column_name"].alias("ColumnName"),
df["column_datatype"].alias("ColumnDataType")]
df.groupBy("batch_id") \
.agg(collect_set(struct(*list1))[0].alias("GeneralInfo"),
collect_list(struct(*list2)).alias("Tables")).drop("batch_id") \
.toJSON().foreach(print)
# outputs JSON --->
'''
{
"GeneralInfo":{
"DataSetID":123,
"Owner":[
"test1@gmail.com",
"test1@gmail.com",
"test3@gmail.com"
],
"RefreshRate":"Weekly",
"LastUpdate":"21/05/15",
"Description":"",
"BusinessFunction":"",
"InfoSource":"TemplateInfo",
"Source":""
},
"Tables":[
{
"TableName":"Employee",
"ColumnName":"Employee_id",
"ColumnDataType":"int"
},
{
"TableName":"Employee",
"ColumnName":"Employee_name",
"ColumnDataType":"string"
}
]
}
'''
Upvotes: 4