Reputation: 1217
I have a DataFrame
and running a flatMap
on it.
Inside the map function i am trying to append a new field to the given row.
How can I do that?
def mapper(row):
value = 0 #some computation here
row.append(newvalue = value) #??? something like that
return row
data = sqlContext.jsonFile("data.json")
mapped = data.flatMap(mapper)
#do further mappings with the new field
Upvotes: 3
Views: 3385
Reputation: 101
Following your lead I created something more flexible, and I hope it helps:
from pyspark.sql import Row
def addRowColumn(row, **kwargs):
rowData = row.asDict()
for column in kwargs:
rowData[column] = kwargs[column]
return Row(**rowData)
And to use it on a single row, just call like this:
modifiedRow = addRowColumn(originalRow, test="Hello Column!")
To run on the entire dataset, just create an udf to change each row.
Upvotes: 1
Reputation: 1217
Figured it out, but i am not sure if it is the correct way.
def mapper(row):
from pyspark.sql import Row
value = 0 #some computation here
data = row.asDict()
data["newvalue"] = value
return Row(**data)
Upvotes: 0