Reputation: 153
We have a pyspark dataframe with several columns containing arrays with multiple values. Our goal is to have each of this values of these columns in several rows, keeping the initial different columns. So, starting with something like this:
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
]
Whats:
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |[a, c]|[1, 5]|
|B |[a, b]|null |
|C |[] |[1] |
+---+------+------+
We would like to end up having:
+---+----+----+
|id |col |col |
+---+----+----+
|A |a |null|
|A |c |null|
|A |null|1 |
|A |null|5 |
|B |a |null|
|B |b |null|
|C |null|1 |
+---+----+----+
We are thinking about several approaches:
But all of them smell like dirty, complex, error prone and inefficient workarounds.
Does anyone have an idea about how to solve this in an elegant manner?
Upvotes: 5
Views: 1676
Reputation: 8711
Try this dynamic solution.
Input:
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
]
df=spark.createDataFrame(data,["id","list_a","list_b"])
df.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |[a, c]|[1, 5]|
|B |[a, b]|null |
|C |[] |[1] |
+---+------+------+
Lets create an array of Dataframes for each of the array columns in df. Initialize first with empty Dataframe and then override it in the for loop. For each column, explode it and for all other columns, change the datatype to string with NULL.
from pyspark.sql.types import *
array_cols=df.columns[1:] #just ignoring the ID column
c=0
dfarr=[spark.createDataFrame([],schema=StructType()) for i in array_cols ]
for i in array_cols:
dfarr[c]=df.withColumn(i,explode(col(i)))
for j in array_cols:
if(i!=j):
dfarr[c]=dfarr[c].withColumn(j,expr(" cast(null as string) "))
c=c+1
Now, dfarr is an array of dataframes with the schema like
dfarr[0].printSchema()
root
|-- id: string (nullable = true)
|-- list_a: string (nullable = true)
|-- list_b: string (nullable = true)
dfarr[1].show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |null |1 |
|A |null |5 |
|C |null |1 |
+---+------+------+
The datatypes in dfarr is all similar now, so just do a union of all them. For this we need the reduce function from functools
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionByName, dfs)
Applying to our dfarr
combo=unionAll(*dfarr)
combo.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |a |null |
|A |c |null |
|B |a |null |
|B |b |null |
|A |null |1 |
|A |null |5 |
|C |null |1 |
+---+------+------+
Upvotes: 1
Reputation: 36
In case both columns list_a and list_b could be empty, I would add a 4th case in the dataset
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
("D", None, None),
]
df = spark.createDataFrame(data,["id","list_a","list_b"])
I would then split the original df in 3 (both nulls, list_a exploded and list_b exploded) and the execute a unionByName
dfnulls = df.filter(col("list_a").isNull() & col("list_b").isNull())\
.withColumn("list_a", lit(None))\
.withColumn("list_b", lit(None))
df1 = df\
.withColumn("list_a", explode_outer(col("list_a")))\
.withColumn("list_b", lit(None))\
.filter(~col("list_a").isNull())
df2 = df\
.withColumn("list_b", explode_outer(col("list_b")))\
.withColumn("list_a", lit(None))\
.filter(~col("list_b").isNull())
merged_df = df1.unionByName(df2).unionByName(dfnulls)
merged_df.show()
+---+------+------+
| id|list_a|list_b|
+---+------+------+
| A| a| null|
| A| c| null|
| B| a| null|
| B| b| null|
| A| null| 1|
| A| null| 5|
| C| null| 1|
| D| null| null|
+---+------+------+
Upvotes: 2
Reputation: 1751
The following approach might help you and it's based on Scala
Basically exploding the respective list columns individually and joining the datasets based on the dummy column to get the desired result.
import org.apache.spark.sql.functions.{explode_outer, col, lit, concat}
val df1 = inputDF
.withColumn("list_a", explode_outer(col("list_a")))
.withColumn("random_join_col", concat(col("id"), lit("1")))
.drop("list_b")
val df2 = inputDF
.withColumn("list_b", explode_outer(col("list_b")))
.withColumn("random_join_col", concat(col("id"), lit("2")))
.drop("list_a")
val finalDF = df1.join(df2, Seq("id", "random_join_col"), "full_outer").drop("random_join_col")
// Drop rows, if it got null value on both the list columns
finalDF.na.drop(how = "all", Seq("list_a","list_b")).orderBy("id").show(false)
Upvotes: 1