Smaillns
Smaillns

Reputation: 3127

Pyspark - create a new column with StructType using UDF

df.withColumn("event", ..)

How to add a new column event to a dataframe which will be the result of generate_header ? How can we add a Row as the column value ?

May be we need to convert the function to UDF

def generate_header(df_row):
    
    header = {
        "id": 1,
        ...
    }

    return EntityEvent(header, df_row)


class EntityEvent:
    def __init__(self, _header, _payload):
        self.header = _header
        self.payload = _payload

Let's suppose we have something like this

+---------------+--------------------+
|book_id        |Author              |
+---------------+--------------------+
|865731         |{name: 'A',  }      |
+---------------+--------------------+

and we want to get this

+---------------+--------------------+------------------------------
|book_id        |Author              | event                        |
+---------------+--------------------+------------------------------+
|865731         |{name: 'A',  }      | {header: { id: '865731'}, payload: {name: 'A'}}
+---------------+--------------------+----------------------------------------------------------

Upvotes: 1

Views: 1155

Answers (1)

Emma
Emma

Reputation: 9308

You can use create_map to generate the MapType in column.

(df.withColumn('event', F.create_map(
    F.lit('header'), F.create_map(F.lit('id'), F.col('book_id')),
    F.lit('payload'), F.col('Author'))
)

fyi: You probably cannot have Python object in Spark column. Is it possible to store custom class object in Spark Data Frame as a column value?

Update:

If you need to derive a part that involves some Python library functions.

import base64

# udf function takes the return type schema.
@F.udf(MapType(StringType(), MapType(StringType(), StringType())))
def generate_header(book_id, author):
    b64str = base64.b64encode('some text'.encode('utf-8'))
    return {
        'header': { 'id': book_id, 'key': b64str },
        'payload': author
    }

df.withColumn('event', generate_header(F.col('book_id'), F.col('Author')))

Upvotes: 1

Related Questions