Reputation: 3127
df.withColumn("event", ..)
How to add a new column event to a dataframe which will be the result of generate_header
? How can we add a Row as the column value ?
May be we need to convert the function to UDF
def generate_header(df_row):
header = {
"id": 1,
...
}
return EntityEvent(header, df_row)
class EntityEvent:
def __init__(self, _header, _payload):
self.header = _header
self.payload = _payload
Let's suppose we have something like this
+---------------+--------------------+
|book_id |Author |
+---------------+--------------------+
|865731 |{name: 'A', } |
+---------------+--------------------+
and we want to get this
+---------------+--------------------+------------------------------
|book_id |Author | event |
+---------------+--------------------+------------------------------+
|865731 |{name: 'A', } | {header: { id: '865731'}, payload: {name: 'A'}}
+---------------+--------------------+----------------------------------------------------------
Upvotes: 1
Views: 1155
Reputation: 9308
You can use create_map
to generate the MapType in column.
(df.withColumn('event', F.create_map(
F.lit('header'), F.create_map(F.lit('id'), F.col('book_id')),
F.lit('payload'), F.col('Author'))
)
fyi: You probably cannot have Python object in Spark column. Is it possible to store custom class object in Spark Data Frame as a column value?
Update:
If you need to derive a part that involves some Python library functions.
import base64
# udf function takes the return type schema.
@F.udf(MapType(StringType(), MapType(StringType(), StringType())))
def generate_header(book_id, author):
b64str = base64.b64encode('some text'.encode('utf-8'))
return {
'header': { 'id': book_id, 'key': b64str },
'payload': author
}
df.withColumn('event', generate_header(F.col('book_id'), F.col('Author')))
Upvotes: 1