Reputation: 1449
I'm reading in some JSON on the from:
{"a": [{"b": {"c": 1, "d": 2}}]}
That is, the array items are unnecessarily nested. Now, because this happens inside an array, the answers given in How to flatten a struct in a Spark dataframe? don't apply directly.
This is how the dataframe looks when parsed:
root
|-- a: array
| |-- element: struct
| | |-- b: struct
| | | |-- c: integer
| | | |-- d: integer
I'm looking to transform the dataframe into this:
root
|-- a: array
| |-- element: struct
| | |-- b_c: integer
| | |-- b_d: integer
How do I go about aliasing the columns inside the array to effectively unnest it?
Upvotes: 3
Views: 2543
Reputation: 487
Simplified Approach:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
ref: https://learn.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema
Upvotes: 1
Reputation: 1449
Using the method presented in the accepted answer I wrote a function to recursively unnest a dataframe (recursing into nested arrays as well):
from pyspark.sql.types import ArrayType, StructType
def flatten(df, sentinel="x"):
def _gen_flatten_expr(schema, indent, parents, last, transform=False):
def handle(field, last):
path = parents + (field.name,)
alias = (
" as "
+ "_".join(path[1:] if transform else path)
+ ("," if not last else "")
)
if isinstance(field.dataType, StructType):
yield from _gen_flatten_expr(
field.dataType, indent, path, last, transform
)
elif (
isinstance(field.dataType, ArrayType) and
isinstance(field.dataType.elementType, StructType)
):
yield indent, "transform("
yield indent + 1, ".".join(path) + ","
yield indent + 1, sentinel + " -> struct("
yield from _gen_flatten_expr(
field.dataType.elementType,
indent + 2,
(sentinel,),
True,
True
)
yield indent + 1, ")"
yield indent, ")" + alias
else:
yield (indent, ".".join(path) + alias)
try:
*fields, last_field = schema.fields
except ValueError:
pass
else:
for field in fields:
yield from handle(field, False)
yield from handle(last_field, last)
lines = []
for indent, line in _gen_flatten_expr(df.schema, 0, (), True):
spaces = " " * 4 * indent
lines.append(spaces + line)
expr = "struct(" + "\n".join(lines) + ") as " + sentinel
return df.selectExpr(expr).select(sentinel + ".*")
Upvotes: 1
Reputation: 42342
You can use transform
:
df2 = df.selectExpr("transform(a, x -> struct(x.b.c as b_c, x.b.d as b_d)) as a")
Upvotes: 3