Fluxy
Fluxy

Reputation: 2978

How to stack two columns into a single one in PySpark?

I have the following PySpark DataFrame:

id   col1   col2
A    2      3
A    2      4
A    4      6
B    1      2

I want to stack col1 and col2 in order to get a single column as follows:

id   col3
A    2   
A    3
A    4
A    6
B    1
B    2

How can I do so?

df = (
    sc.parallelize([
        (A, 2, 3), (A, 2, 4), (A, 4, 6),
        (B, 1, 2),
    ]).toDF(["id", "col1", "col2"])
)

Upvotes: 6

Views: 8436

Answers (3)

Preetham
Preetham

Reputation: 577

Rather a simple solution if the number of columns involved is less.

df = (
    sc.parallelize([
        ('A', 2, 3), ('A', 2, 4), ('A', 4, 6),
        ('B', 1, 2),
    ]).toDF(["id", "col1", "col2"])
)


df.show()

+---+----+----+
| id|col1|col2|
+---+----+----+
|  A|   2|   3|
|  A|   2|   4|
|  A|   4|   6|
|  B|   1|   2|
+---+----+----+

df1 = df.select(['id', 'col1'])
df2 = df.select(['id', 'col2']).withColumnRenamed('col2', 'col1')

df_new = df1.union(df2)
df_new = df_new.drop_duplicates()
df_new.show()

+---+----+
| id|col1|
+---+----+
|  A|   3|
|  A|   4|
|  B|   1|
|  A|   6|
|  A|   2|
|  B|   2|
+---+----+

Upvotes: 1

akuiper
akuiper

Reputation: 214927

The simplest is merge col1 and col2 into an array column and then explode it:

df.show()
+---+----+----+
| id|col1|col2|
+---+----+----+
|  A|   2|   3|
|  A|   2|   4|
|  A|   4|   6|
|  B|   1|   2|
+---+----+----+

df.selectExpr('id', 'explode(array(col1, col2))').show()
+---+---+
| id|col|
+---+---+
|  A|  2|
|  A|  3|
|  A|  2|
|  A|  4|
|  A|  4|
|  A|  6|
|  B|  1|
|  B|  2|
+---+---+

You can drop duplicates if you don't need them.

Upvotes: 10

Sander Lam
Sander Lam

Reputation: 83

To do this, group by the "id", then collect the lists from both "col1" and "col2" in an aggregation, to then explode it again into one column. To get the unique numbers, just drop the duplicates after.

I see that you also have the numbers sorted in your end result, this is done by sorting the concatted lists in the aggregation.

The following code:

from pyspark.sql.functions import concat, collect_list, explode, col, sort_array

df = (
    sc.parallelize([
        ('A', 2, 3), ('A', 2, 4), ('A', 4, 6),
        ('B', 1, 2),
    ]).toDF(["id", "col1", "col2"])
)

result = df.groupBy("id") \
.agg(sort_array(concat(collect_list("col1"),collect_list("col2"))).alias("all_numbers")) \
.orderBy("id") \
.withColumn('number', explode(col('all_numbers'))) \
.dropDuplicates() \
.select("id","number") \
.show()

will yield:

+---+------+
| id|number|
+---+------+
|  A|     2|
|  A|     3|
|  A|     4|
|  A|     6|
|  B|     1|
|  B|     2|
+---+------+

Upvotes: 1

Related Questions