landoooo
landoooo

Reputation: 153

Pyspark > Dataframe with multiple array columns into multiple rows with one value each

We have a pyspark dataframe with several columns containing arrays with multiple values. Our goal is to have each of this values of these columns in several rows, keeping the initial different columns. So, starting with something like this:

data = [
    ("A", ["a", "c"], ["1", "5"]),
    ("B", ["a", "b"], None),
    ("C", [], ["1"]),
]

Whats:

+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |[a, c]|[1, 5]|
|B  |[a, b]|null  |
|C  |[]    |[1]   |
+---+------+------+

We would like to end up having:

+---+----+----+
|id |col |col |
+---+----+----+
|A  |a   |null|
|A  |c   |null|
|A  |null|1   |
|A  |null|5   |
|B  |a   |null|
|B  |b   |null|
|C  |null|1   |
+---+----+----+

We are thinking about several approaches:

  1. prefixing each value with a column indicator, merge all the arrays into a single one, explode it and reorganize the different values into different columns
  2. split the dataframe into several, each one with one of these array columns, explode the array column and then, concatenating the dataframes

But all of them smell like dirty, complex, error prone and inefficient workarounds.

Does anyone have an idea about how to solve this in an elegant manner?

Upvotes: 5

Views: 1676

Answers (3)

stack0114106
stack0114106

Reputation: 8711

Try this dynamic solution.

Input:

data = [
    ("A", ["a", "c"], ["1", "5"]),
    ("B", ["a", "b"], None),
    ("C", [], ["1"]),
]
df=spark.createDataFrame(data,["id","list_a","list_b"])
df.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |[a, c]|[1, 5]|
|B  |[a, b]|null  |
|C  |[]    |[1]   |
+---+------+------+

Lets create an array of Dataframes for each of the array columns in df. Initialize first with empty Dataframe and then override it in the for loop. For each column, explode it and for all other columns, change the datatype to string with NULL.

from pyspark.sql.types import *
array_cols=df.columns[1:]  #just ignoring the ID column
c=0
dfarr=[spark.createDataFrame([],schema=StructType()) for i in array_cols ]
for i in array_cols:
    dfarr[c]=df.withColumn(i,explode(col(i)))
    for j in array_cols:
        if(i!=j):
            dfarr[c]=dfarr[c].withColumn(j,expr(" cast(null as string) "))
    c=c+1

Now, dfarr is an array of dataframes with the schema like

dfarr[0].printSchema()
root
 |-- id: string (nullable = true)
 |-- list_a: string (nullable = true)
 |-- list_b: string (nullable = true)

dfarr[1].show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |null  |1     |
|A  |null  |5     |
|C  |null  |1     |
+---+------+------+

The datatypes in dfarr is all similar now, so just do a union of all them. For this we need the reduce function from functools

from functools import reduce  
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionByName, dfs) 

Applying to our dfarr

combo=unionAll(*dfarr)

combo.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |a     |null  |
|A  |c     |null  |
|B  |a     |null  |
|B  |b     |null  |
|A  |null  |1     |
|A  |null  |5     |
|C  |null  |1     |
+---+------+------+

Upvotes: 1

ferran
ferran

Reputation: 36

In case both columns list_a and list_b could be empty, I would add a 4th case in the dataset

data = [
    ("A", ["a", "c"], ["1", "5"]),
    ("B", ["a", "b"], None),
    ("C", [], ["1"]),
    ("D", None, None),
]
df = spark.createDataFrame(data,["id","list_a","list_b"])

I would then split the original df in 3 (both nulls, list_a exploded and list_b exploded) and the execute a unionByName

dfnulls = df.filter(col("list_a").isNull() & col("list_b").isNull())\
    .withColumn("list_a", lit(None))\
    .withColumn("list_b", lit(None))

df1 = df\
    .withColumn("list_a", explode_outer(col("list_a")))\
    .withColumn("list_b", lit(None))\
    .filter(~col("list_a").isNull())

df2 = df\
    .withColumn("list_b", explode_outer(col("list_b")))\
    .withColumn("list_a", lit(None))\
    .filter(~col("list_b").isNull())

merged_df = df1.unionByName(df2).unionByName(dfnulls)

merged_df.show()

+---+------+------+
| id|list_a|list_b|
+---+------+------+
|  A|     a|  null|
|  A|     c|  null|
|  B|     a|  null|
|  B|     b|  null|
|  A|  null|     1|
|  A|  null|     5|
|  C|  null|     1|
|  D|  null|  null|
+---+------+------+

Upvotes: 2

Sivakumar
Sivakumar

Reputation: 1751

The following approach might help you and it's based on Scala

Basically exploding the respective list columns individually and joining the datasets based on the dummy column to get the desired result.

import org.apache.spark.sql.functions.{explode_outer, col, lit, concat}


val df1 = inputDF
  .withColumn("list_a", explode_outer(col("list_a")))
  .withColumn("random_join_col", concat(col("id"), lit("1")))
  .drop("list_b")

val df2 = inputDF
  .withColumn("list_b", explode_outer(col("list_b")))
  .withColumn("random_join_col", concat(col("id"), lit("2")))
  .drop("list_a")


val finalDF = df1.join(df2, Seq("id", "random_join_col"), "full_outer").drop("random_join_col")

// Drop rows, if it got null value on both the list columns
finalDF.na.drop(how = "all", Seq("list_a","list_b")).orderBy("id").show(false)

Upvotes: 1

Related Questions