Reputation: 163
I have a sample pyspark code where I am trying to generate a json structure . Below is the code
def func(row):
temp=row.asDict()
headDict = {}
headDict['type'] = "record"
headDict['name'] = "source"
headDict['namespace'] = "com.streaming.event"
headDict['doc'] = "SCD signals from source"
fieldslist = []
headDict['fields'] = fieldslist
for i in temp:
fieldslist.append({i:temp[i]})
return (json.dumps(headDict))
if __name__ == "__main__":
spark = SparkSession.builder.master("local[*]").appName("PythonWordCount").getOrCreate()
payload=udf(func,StringType())
data = spark.createDataFrame(
[
(1, "a", 'foo1'), # create your data here, be consistent in the types.
(2, "b", 'bar'),
(3, "c", 'mnc')
],
['id', 'nm', 'txt'] # add your columns label here
)
df=data.withColumn("payload1",payload(struct([data[x] for x in data.columns])))
df.show(3,False)
I am getting an error while inserting data into dataframe
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple '{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from source"}' with StructType
If I am trying to print the json payload I am getting correct output
{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from source"}
I have also verified this is a valid json.
I am not sure what I am missing here.
Could this be a python version issue?I am using python 2.7
Update-I tried to run the exact same code using python 3.7 and it is running fine
Upvotes: 0
Views: 255
Reputation: 2072
it works for me in spark 3.x with python 2.7.x.,
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.0
/_/
Using Python version 2.7.17 (default, Jul 20 2020 15:37:01)
SparkSession available as 'spark'.
results from pyspark shell
import json
from pyspark.sql.functions import *
from pyspark.sql.types import *
def func(row):
temp=row.asDict()
headDict = {}
headDict['type'] = "record"
headDict['name'] = "source"
headDict['namespace'] = "com.streaming.event"
headDict['doc'] = "SCD signals from source"
fieldslist = []
headDict['fields'] = fieldslist
for i in temp:
fieldslist.append({i:temp[i]})
return (json.dumps(headDict))
spark = SparkSession.builder.master("local[*]").appName("PythonWordCount").getOrCreate()
payload=udf(func,StringType())
data = spark.createDataFrame([(1, "a", 'foo1'), (2, "b", 'bar'), (3, "c", 'mnc')],['id', 'nm', 'txt'])
data.show()
'''
+---+---+----+
| id| nm| txt|
+---+---+----+
| 1| a|foo1|
| 2| b| bar|
| 3| c| mnc|
+---+---+----+
'''
df=data.withColumn("payload1",payload(struct([data[x] for x in data.columns])))
df.show(3,False)
'''
+---+---+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |nm |txt |payload1 |
+---+---+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |a |foo1|{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "foo1"}, {"id": 1}, {"nm": "a"}], "doc": "SCD signals from source"}|
|2 |b |bar |{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "bar"}, {"id": 2}, {"nm": "b"}], "doc": "SCD signals from source"} |
|3 |c |mnc |{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from source"} |
+---+---+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
'''
Upvotes: 1