Anup K
Anup K

Reputation: 63

How to modify pyspark dataframe nested struct column

I am trying to anonymize/hash a nested column, but haven't been successful. The schema looks something like this:

-- abc: struct (nullable = true)
|    |-- xyz: struct (nullable = true)
|    |    |-- abc123: string (nullable = true)
|    |    |-- services: struct (nullable = true)
|    |    |    |-- service: array (nullable = true)
|    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |-- type: string (nullable = true)
|    |    |    |    |    |-- subtype: string (nullable = true)

I need to change (anonymize/hash) value of type column.

Upvotes: 6

Views: 4363

Answers (2)

user2314737
user2314737

Reputation: 29307

With the hash function from the pyspark-nested-functions library you can hash any nested field (e.g. "abc.xyz.services.type") with hash_field(df, "abc.xyz.services.type"):

from pyspark.sql import Row
df = spark.createDataFrame([
    Row(abc=Row(xyz=Row(abc123="value123", services=[Row(type="type1", subtype="subtype1")])))
])

df.show(truncate=False)
# +---------------------------------+                                             
# |abc                              |
# +---------------------------------+
# |{{value123, [{type1, subtype1}]}}|
# +---------------------------------+

from nestedfunctions.functions.hash import hash_field

hashed_df = hash_field(df, "abc.xyz.services.type", num_bits=256)
hashed_df.show(truncate=False)
# +--------------------------------------------------------------------------------------------+
# |abc                                                                                         |
# +--------------------------------------------------------------------------------------------+
# |{{value123, [{ba5857c2e8a7c12df14097eaa5ffb1c97976b9d433fe63a65df84849c5eea0ec, subtype1}]}}|
# +--------------------------------------------------------------------------------------------+

Upvotes: 0

blackbishop
blackbishop

Reputation: 32650

For Spark 3.1+, there is a column method withField that can be used to update struct fields.

Assuming this is your input dataframe (corresponding to the schema you provided):

from pyspark.sql import Row

df = spark.createDataFrame([
    Row(abc=Row(xyz=Row(abc123="value123", services=[Row(type="type1", subtype="subtype1")])))
])

df.show(truncate=False)
#+---------------------------------+
#|abc                              |
#+---------------------------------+
#|{{value123, [{type1, subtype1}]}}|
#+---------------------------------+

You can achieve that using transform on the array services to hash the field type for each struct element (here I used xxhash64 function to illustrate) like this:

import pyspark.sql.functions as F

df2 = df.withColumn(
    "abc",
    F.col("abc").withField(
        "xyz",
        F.col("abc.xyz").withField(
            "services",
            F.expr("transform(abc.xyz.services, x -> struct(xxhash64(x.type) as type, x.subtype))")
        )
    )
)

df2.show(truncate=False)
#+-----------------------------------------------+
#|abc                                            |
#+-----------------------------------------------+
#|{{value123, [{2134479862461603894, subtype1}]}}|
#+-----------------------------------------------+

For older Spark versions, you'll need to recreate the whole structs in order to update the fields, which makes it tedious when there are many nested fields. In your case it would be like this:

df2 = df.withColumn(
    "abc",
    F.struct(
        F.struct(
            F.col("abc.xyz.abc123"),
            F.expr(
                "transform(abc.xyz.services, x -> struct(xxhash64(x.type) as type, x.subtype))"
            ).alias("services")
        ).alias("xyz")
    )
)

Upvotes: 6

Related Questions