surj
surj

Reputation: 4904

How to add column to exploded struct in Spark?

Say I have the following data:

{"id":1, "payload":[{"foo":1, "lol":2},{"foo":2, "lol":2}]}

I would like to explode the payload and add a column to it, like this:

df = df.select('id', F.explode('payload').alias('data'))
df = df.withColumn('data.bar', F.col('data.foo') * 2)

However this results in a dataframe with three columns:

I expected the data.bar to be part of the data struct...

How can I add a column to the exploded struct, instead of adding a top-level column?

Upvotes: 1

Views: 893

Answers (1)

Zhang Tong
Zhang Tong

Reputation: 4719

df = df.withColumn('data', f.struct(
    df['data']['foo'].alias('foo'),
   (df['data']['foo'] * 2).alias('bar')
))

This will result in:

root
 |-- id: long (nullable = true)
 |-- data: struct (nullable = false)
 |    |-- col1: long (nullable = true)
 |    |-- bar: long (nullable = true)

UPDATE:

def func(x):
    tmp = x.asDict()
    tmp['foo'] = tmp.get('foo', 0) * 100
    res = zip(*tmp.items())
    return Row(*res[0])(*res[1])

df = df.withColumn('data', f.UserDefinedFunction(func, StructType(
    [StructField('foo', StringType()), StructField('lol', StringType())]))(df['data']))

P.S.

Spark almost do not support inplace opreation.

So every time you want to do inplace, you need to do replace actually.

Upvotes: 1

Related Questions