WaeCo
WaeCo

Reputation: 1217

Append a field to a row in pyspark

I have a DataFrame and running a flatMap on it. Inside the map function i am trying to append a new field to the given row.

How can I do that?

def mapper(row):
    value = 0 #some computation here
    row.append(newvalue = value) #??? something like that
    return row

data = sqlContext.jsonFile("data.json")
mapped = data.flatMap(mapper)
#do further mappings with the new field

Upvotes: 3

Views: 3385

Answers (2)

Douglas Penna
Douglas Penna

Reputation: 101

Following your lead I created something more flexible, and I hope it helps:

from pyspark.sql import Row

def addRowColumn(row, **kwargs):
    rowData = row.asDict()
    for column in kwargs:
        rowData[column] = kwargs[column]
    return Row(**rowData)

And to use it on a single row, just call like this:

modifiedRow = addRowColumn(originalRow, test="Hello Column!")

To run on the entire dataset, just create an udf to change each row.

Upvotes: 1

WaeCo
WaeCo

Reputation: 1217

Figured it out, but i am not sure if it is the correct way.

def mapper(row):
    from pyspark.sql import Row
    value = 0 #some computation here
    data = row.asDict()
    data["newvalue"] = value
    return Row(**data)

Upvotes: 0

Related Questions