M. Mate
M. Mate

Reputation: 47

Unpivoting pyspark dataframe with multiple data types

Suppose, I have the following spark dataframe (df):

from datetime import datetime
import pandas as pd

ts = [datetime(2022, 1, 1, 0), datetime(2022, 1, 1, 1), datetime(2022, 1, 1, 2)] 
pandas_df = pd.DataFrame({"Timestamp": ts, "column_a": ["a", "b", "c"], "column_b": [1.1, 2.2, 3.3], "column_c": [5, 6, 7]})
df = spark.createDataFrame(pandas_df)
df = df.withColumn("column_c", df.column_c.cast("int"))

which would look like this:

Timestamp   column_a    column_b    column_c
0   2022-01-01 00:00:00 a   1.1 5
1   2022-01-01 01:00:00 b   2.2 6
2   2022-01-01 02:00:00 c   3.3 7

Now, I would like to unpivot it using pyspark, and get the below result:

Timestamp   Columns string  double  int
2022-01-01T02:00:00.000+0000    column_c    null    null    7
2022-01-01T02:00:00.000+0000    column_b    null    3.3 null
2022-01-01T02:00:00.000+0000    column_a    c   null    null
2022-01-01T01:00:00.000+0000    column_c    null    null    6
2022-01-01T01:00:00.000+0000    column_b    null    2.2 null
2022-01-01T01:00:00.000+0000    column_a    b   null    null
2022-01-01T00:00:00.000+0000    column_c    null    null    5
2022-01-01T00:00:00.000+0000    column_b    null    1.1 null
2022-01-01T00:00:00.000+0000    column_a    a   null    null

One way I can think of is unpivoting each type separately joining the resulting dataframes:

import pyspark.sql.functions as func

final_df = None

for data_type in ["string", "double", "int"]:
    
    dtype_cols = [item[0] for item in df.dtypes if item[1] == data_type]
    stack_str = ",".join([f"'{item}', {item}" for item in dtype_cols])
    expression = f"stack({len(dtype_cols)}, {stack_str}) as (Columns, {data_type})"
    untransposed_df = df.select("Timestamp", func.expr(expression))
    if not final_df:
        final_df = untransposed_df
    else: 
        final_df = final_df.join(untransposed_df, ["Timestamp", "Columns"], how="outer") 

This doesn't look very efficient as it involves multiple joins. Is there a better way of doing this?

Thank you!

Upvotes: 0

Views: 725

Answers (2)

Matt Andruff
Matt Andruff

Reputation: 5155

With more time here's the complete answer:

no_timestamp_columns = [ field for field in df.schema.fields  if field.name != 'timestamp' ] # use list comprehension to pull out fields we will need.
df.select(
 col("timestamp") ,
  explode( #make many rows from this array
   array( *[ # use list comprehension to build array.
    struct( # use struct to group the data we want as columns together
     lit(column.name).alias("column"), #for each existing column
      *[ # create standard set of columns for all values nulling out the ones that aren't the name of the column we are working with and providing the value for the column we are working with.
       (( lit(None) if value.name != column.name else col(column.name) ).cast(value.dataType) ).alias(str(value.dataType)) for value in no_timestamp_columns 
      ]  
     ) for column in no_timestamp_columns 
   ]) 
  ).alias("columns")
).select(
  col("timestamp") ,
  col("columns.*"),
).show()
+--------------------+------+----------+--------+
|           timestamp|column|StringType|LongType|
+--------------------+------+----------+--------+
|2022-01-28 23:32:...|  Col1|      str1|    null|
|2022-01-28 23:32:...|  Col2|      null|     100|
|2022-02-28 23:02:...|  Col1|      str2|    null|
|2022-02-28 23:02:...|  Col2|      null|     202|
|2022-02-28 17:22:...|  Col1|      str3|    null|
|2022-02-28 17:22:...|  Col2|      null|     102|
|2022-02-28 23:19:...|  Col1|      str4|    null|
|2022-02-28 23:19:...|  Col2|      null|     102|
|2022-03-29 17:32:...|  Col1|      str5|    null|
|2022-03-29 17:32:...|  Col2|      null|     102|
|2022-01-28 23:32:...|  Col1|      str6|    null|
|2022-01-28 23:32:...|  Col2|      null|     101|
|2022-02-28 17:28:...|  Col1|      str7|    null|
|2022-02-28 17:28:...|  Col2|      null|     201|
|2022-03-28 23:59:...|  Col1|      str8|    null|
|2022-03-28 23:59:...|  Col2|      null|     100|
|2022-02-28 21:02:...|  Col1|      str9|    null|
|2022-02-28 21:02:...|  Col2|      null|     100|
+--------------------+------+----------+--------+


I should not if you use an array you have to keep the form of the structs consistent. That was what took me longer to think through. You could either cast all columns to a stringType and then work out in the select how to cast back Or you can take the approach I did above which is to keep the original type but fill in nulls, in a consistent manner.

Upvotes: 1

Matt Andruff
Matt Andruff

Reputation: 5155

Here's most of the answer. Admittedly I don't put it in the exact format you ask for but it still follows the spirit, missing some nulls. You could likely refine the logic a little more to create the format your interested in but I'm not sure if the investment is worth it. You might be able to with a little more time to create more substructor in the struct for the column values, using the column name, use the columns column to select the values you want but this gives you a general idea of how to go about it.

from  pyspark.sql.functions import col, first, array, struct
>>> data = [
... ("2022-01-28 23:32:52.0","str1",100),
... ("2022-02-28 23:02:40.0","str2",202),
... ("2022-02-28 17:22:45.0","str3",102),
... ("2022-02-28 23:19:37.0","str4",102),
... ("2022-03-29 17:32:02.0","str5",102),
... ("2022-01-28 23:32:40.0","str6",101),
... ("2022-02-28 17:28:09.0","str7",201),
... ("2022-03-28 23:59:54.0","str8",100),
... ("2022-02-28 21:02:40.0","str9",100),
... ]
>>> 
>>> df = spark.createDataFrame(data)
>>> df = df.toDF("timestamp", "Col1", "Col2")
>>> df.show()
+--------------------+----+----+                                                
|           timestamp|Col1|Col2|
+--------------------+----+----+
|2022-01-28 23:32:...|str1| 100|
|2022-02-28 23:02:...|str2| 202|
|2022-02-28 17:22:...|str3| 102|
|2022-02-28 23:19:...|str4| 102|
|2022-03-29 17:32:...|str5| 102|
|2022-01-28 23:32:...|str6| 101|
|2022-02-28 17:28:...|str7| 201|
|2022-03-28 23:59:...|str8| 100|
|2022-02-28 21:02:...|str9| 100|
+--------------------+----+----+
df.select( 
 col("timestamp") , 
 array( #build array of column names
  *[ lit(field.name) for field in df.schema.fields  if field.name != 'timestamp' ] 
 ).alias("columns"), 
 struct( #build struct of inverted valued
  *[col(field.name).alias(str(field.dataType)) for field in df.schema.fields if field.name != 'timestamp'] #use list comprehension to pass varargs in format we want
 ).alias("data") )\
.select( 
 col("timestamp"), 
 explode(
  col("columns")
 ),
 col("data.*"))\
.show()
+--------------------+----+----------+--------+
|           timestamp| col|StringType|LongType|
+--------------------+----+----------+--------+
|2022-01-28 23:32:...|Col1|      str1|     100|
|2022-01-28 23:32:...|Col2|      str1|     100|
|2022-02-28 23:02:...|Col1|      str2|     202|
|2022-02-28 23:02:...|Col2|      str2|     202|
|2022-02-28 17:22:...|Col1|      str3|     102|
|2022-02-28 17:22:...|Col2|      str3|     102|
|2022-02-28 23:19:...|Col1|      str4|     102|
|2022-02-28 23:19:...|Col2|      str4|     102|
|2022-03-29 17:32:...|Col1|      str5|     102|
|2022-03-29 17:32:...|Col2|      str5|     102|
|2022-01-28 23:32:...|Col1|      str6|     101|
|2022-01-28 23:32:...|Col2|      str6|     101|
|2022-02-28 17:28:...|Col1|      str7|     201|
|2022-02-28 17:28:...|Col2|      str7|     201|
|2022-03-28 23:59:...|Col1|      str8|     100|
|2022-03-28 23:59:...|Col2|      str8|     100|
|2022-02-28 21:02:...|Col1|      str9|     100|
|2022-02-28 21:02:...|Col2|      str9|     100|
+--------------------+----+----------+--------+


Upvotes: 1

Related Questions