Omer Farooq Ahmed
Omer Farooq Ahmed

Reputation: 15

How to concatenate two Pyspark Dataframes column wise

If I have two dataframes with the same number of rows and the same sequence of primary key values, is it possible to concatenate those two dataframes columns wise (like pd.concat(list_of_dataframes, axis=1)), without a join (join would be an expensive operation as it would go through each row/primary key id to match).

If I have the following two dataframes in Pyspark:

df1:

id  col1    col2    col3
1001    1   0   1
1002    0   1   1
1003    0   0   1

df2:

id  col4    col5
1001    1   0
1002    1   1
1003    0   1

How do I concatenate these two to get a resulting dataframe like this

id  col1    col2    col3    col4    col5
1001    1   0   1   1   0
1002    0   1   1   1   1
1003    0   0   1   0   1

without using a join. In pandas I can easily do: pd.concat([df1, df2], axis=1)

I want to avoid the operational overhead of a join where each row of both dataframes will need to be compared before merging the two because I'm dealing with wide dataframes that I need to concatenate ( around 20 dataframes each with dim 500,000 rows by 20,000 columns). I'm assuming the reason I can't find an equivalent to the concat function in Spark is because when data is distributed, the primary key sequence isn't preserved and so a join is neceassary to compare each row?

Ideally I want to concat all 20 dataframes (csv files) into one large dataframe approx 500k rows by 400k cols and then do further filtering opearations on it.

The reason I can't just do a df.toPandas() and then use the pd.concat() function is because the resulting dataframe will be too large and require too much memory to fit in the driver program.

Upvotes: 0

Views: 2095

Answers (1)

Yefet
Yefet

Reputation: 2086

I would use merge here but you can also use a combination of unionByName and groupby to achieve that without using "merge"

import pyspark.sql.functions as F


# Create your example dataframes
data1 = [(1001, 1, 0, 1),
         (1002, 0, 1, 1),
         (1003, 0, 0, 1)]
columns1 = ["id", "col1", "col2", "col3"]
df1 = spark.createDataFrame(data1, columns1)

data2 = [(1001, 1, 0),
         (1002, 1, 1),
         (1003, 0, 1)]
columns2 = ["id", "col4", "col5"]
df2 = spark.createDataFrame(data2, columns2)

# Union by name and groupby on id and select the non-null after the aggrgation
result_df = df1.unionByName(df2, allowMissingColumns=True).groupBy("id") \
    .agg(
        F.coalesce(F.first("col1"), F.last("col1")).alias("col1"),
        F.coalesce(F.first("col2"), F.last("col2")).alias("col2"),
        F.coalesce(F.first("col3"), F.last("col3")).alias("col3"),
        F.coalesce(F.first("col4"), F.last("col4")).alias("col4"),
        F.coalesce(F.first("col5"), F.last("col5")).alias("col5")
    )

result_df.show()

result:

+----+----+----+----+----+----+
|  id|col1|col2|col3|col4|col5|
+----+----+----+----+----+----+
|1001|   1|   0|   1|   1|   0|
|1002|   0|   1|   1|   1|   1|
|1003|   0|   0|   1|   0|   1|
+----+----+----+----+----+----+

Upvotes: 2

Related Questions