Reputation: 47
Suppose, I have the following spark dataframe (df):
from datetime import datetime
import pandas as pd
ts = [datetime(2022, 1, 1, 0), datetime(2022, 1, 1, 1), datetime(2022, 1, 1, 2)]
pandas_df = pd.DataFrame({"Timestamp": ts, "column_a": ["a", "b", "c"], "column_b": [1.1, 2.2, 3.3], "column_c": [5, 6, 7]})
df = spark.createDataFrame(pandas_df)
df = df.withColumn("column_c", df.column_c.cast("int"))
which would look like this:
Timestamp column_a column_b column_c
0 2022-01-01 00:00:00 a 1.1 5
1 2022-01-01 01:00:00 b 2.2 6
2 2022-01-01 02:00:00 c 3.3 7
Now, I would like to unpivot it using pyspark, and get the below result:
Timestamp Columns string double int
2022-01-01T02:00:00.000+0000 column_c null null 7
2022-01-01T02:00:00.000+0000 column_b null 3.3 null
2022-01-01T02:00:00.000+0000 column_a c null null
2022-01-01T01:00:00.000+0000 column_c null null 6
2022-01-01T01:00:00.000+0000 column_b null 2.2 null
2022-01-01T01:00:00.000+0000 column_a b null null
2022-01-01T00:00:00.000+0000 column_c null null 5
2022-01-01T00:00:00.000+0000 column_b null 1.1 null
2022-01-01T00:00:00.000+0000 column_a a null null
One way I can think of is unpivoting each type separately joining the resulting dataframes:
import pyspark.sql.functions as func
final_df = None
for data_type in ["string", "double", "int"]:
dtype_cols = [item[0] for item in df.dtypes if item[1] == data_type]
stack_str = ",".join([f"'{item}', {item}" for item in dtype_cols])
expression = f"stack({len(dtype_cols)}, {stack_str}) as (Columns, {data_type})"
untransposed_df = df.select("Timestamp", func.expr(expression))
if not final_df:
final_df = untransposed_df
else:
final_df = final_df.join(untransposed_df, ["Timestamp", "Columns"], how="outer")
This doesn't look very efficient as it involves multiple joins. Is there a better way of doing this?
Thank you!
Upvotes: 0
Views: 725
Reputation: 5155
With more time here's the complete answer:
no_timestamp_columns = [ field for field in df.schema.fields if field.name != 'timestamp' ] # use list comprehension to pull out fields we will need.
df.select(
col("timestamp") ,
explode( #make many rows from this array
array( *[ # use list comprehension to build array.
struct( # use struct to group the data we want as columns together
lit(column.name).alias("column"), #for each existing column
*[ # create standard set of columns for all values nulling out the ones that aren't the name of the column we are working with and providing the value for the column we are working with.
(( lit(None) if value.name != column.name else col(column.name) ).cast(value.dataType) ).alias(str(value.dataType)) for value in no_timestamp_columns
]
) for column in no_timestamp_columns
])
).alias("columns")
).select(
col("timestamp") ,
col("columns.*"),
).show()
+--------------------+------+----------+--------+
| timestamp|column|StringType|LongType|
+--------------------+------+----------+--------+
|2022-01-28 23:32:...| Col1| str1| null|
|2022-01-28 23:32:...| Col2| null| 100|
|2022-02-28 23:02:...| Col1| str2| null|
|2022-02-28 23:02:...| Col2| null| 202|
|2022-02-28 17:22:...| Col1| str3| null|
|2022-02-28 17:22:...| Col2| null| 102|
|2022-02-28 23:19:...| Col1| str4| null|
|2022-02-28 23:19:...| Col2| null| 102|
|2022-03-29 17:32:...| Col1| str5| null|
|2022-03-29 17:32:...| Col2| null| 102|
|2022-01-28 23:32:...| Col1| str6| null|
|2022-01-28 23:32:...| Col2| null| 101|
|2022-02-28 17:28:...| Col1| str7| null|
|2022-02-28 17:28:...| Col2| null| 201|
|2022-03-28 23:59:...| Col1| str8| null|
|2022-03-28 23:59:...| Col2| null| 100|
|2022-02-28 21:02:...| Col1| str9| null|
|2022-02-28 21:02:...| Col2| null| 100|
+--------------------+------+----------+--------+
I should not if you use an array you have to keep the form of the structs consistent. That was what took me longer to think through. You could either cast all columns to a stringType and then work out in the select how to cast back Or you can take the approach I did above which is to keep the original type but fill in nulls, in a consistent manner.
Upvotes: 1
Reputation: 5155
Here's most of the answer. Admittedly I don't put it in the exact format you ask for but it still follows the spirit, missing some nulls. You could likely refine the logic a little more to create the format your interested in but I'm not sure if the investment is worth it. You might be able to with a little more time to create more substructor in the struct for the column values, using the column name, use the columns
column to select the values you want but this gives you a general idea of how to go about it.
from pyspark.sql.functions import col, first, array, struct
>>> data = [
... ("2022-01-28 23:32:52.0","str1",100),
... ("2022-02-28 23:02:40.0","str2",202),
... ("2022-02-28 17:22:45.0","str3",102),
... ("2022-02-28 23:19:37.0","str4",102),
... ("2022-03-29 17:32:02.0","str5",102),
... ("2022-01-28 23:32:40.0","str6",101),
... ("2022-02-28 17:28:09.0","str7",201),
... ("2022-03-28 23:59:54.0","str8",100),
... ("2022-02-28 21:02:40.0","str9",100),
... ]
>>>
>>> df = spark.createDataFrame(data)
>>> df = df.toDF("timestamp", "Col1", "Col2")
>>> df.show()
+--------------------+----+----+
| timestamp|Col1|Col2|
+--------------------+----+----+
|2022-01-28 23:32:...|str1| 100|
|2022-02-28 23:02:...|str2| 202|
|2022-02-28 17:22:...|str3| 102|
|2022-02-28 23:19:...|str4| 102|
|2022-03-29 17:32:...|str5| 102|
|2022-01-28 23:32:...|str6| 101|
|2022-02-28 17:28:...|str7| 201|
|2022-03-28 23:59:...|str8| 100|
|2022-02-28 21:02:...|str9| 100|
+--------------------+----+----+
df.select(
col("timestamp") ,
array( #build array of column names
*[ lit(field.name) for field in df.schema.fields if field.name != 'timestamp' ]
).alias("columns"),
struct( #build struct of inverted valued
*[col(field.name).alias(str(field.dataType)) for field in df.schema.fields if field.name != 'timestamp'] #use list comprehension to pass varargs in format we want
).alias("data") )\
.select(
col("timestamp"),
explode(
col("columns")
),
col("data.*"))\
.show()
+--------------------+----+----------+--------+
| timestamp| col|StringType|LongType|
+--------------------+----+----------+--------+
|2022-01-28 23:32:...|Col1| str1| 100|
|2022-01-28 23:32:...|Col2| str1| 100|
|2022-02-28 23:02:...|Col1| str2| 202|
|2022-02-28 23:02:...|Col2| str2| 202|
|2022-02-28 17:22:...|Col1| str3| 102|
|2022-02-28 17:22:...|Col2| str3| 102|
|2022-02-28 23:19:...|Col1| str4| 102|
|2022-02-28 23:19:...|Col2| str4| 102|
|2022-03-29 17:32:...|Col1| str5| 102|
|2022-03-29 17:32:...|Col2| str5| 102|
|2022-01-28 23:32:...|Col1| str6| 101|
|2022-01-28 23:32:...|Col2| str6| 101|
|2022-02-28 17:28:...|Col1| str7| 201|
|2022-02-28 17:28:...|Col2| str7| 201|
|2022-03-28 23:59:...|Col1| str8| 100|
|2022-03-28 23:59:...|Col2| str8| 100|
|2022-02-28 21:02:...|Col1| str9| 100|
|2022-02-28 21:02:...|Col2| str9| 100|
+--------------------+----+----------+--------+
Upvotes: 1