Reputation: 15
If I have two dataframes with the same number of rows and the same sequence of primary key values, is it possible to concatenate those two dataframes columns wise (like pd.concat(list_of_dataframes, axis=1)), without a join (join would be an expensive operation as it would go through each row/primary key id to match).
If I have the following two dataframes in Pyspark:
df1:
id col1 col2 col3
1001 1 0 1
1002 0 1 1
1003 0 0 1
df2:
id col4 col5
1001 1 0
1002 1 1
1003 0 1
How do I concatenate these two to get a resulting dataframe like this
id col1 col2 col3 col4 col5
1001 1 0 1 1 0
1002 0 1 1 1 1
1003 0 0 1 0 1
without using a join. In pandas I can easily do:
pd.concat([df1, df2], axis=1)
I want to avoid the operational overhead of a join where each row of both dataframes will need to be compared before merging the two because I'm dealing with wide dataframes that I need to concatenate ( around 20 dataframes each with dim 500,000 rows by 20,000 columns). I'm assuming the reason I can't find an equivalent to the concat function in Spark is because when data is distributed, the primary key sequence isn't preserved and so a join is neceassary to compare each row?
Ideally I want to concat all 20 dataframes (csv files) into one large dataframe approx 500k rows by 400k cols and then do further filtering opearations on it.
The reason I can't just do a df.toPandas()
and then use the pd.concat()
function is because the resulting dataframe will be too large and require too much memory to fit in the driver program.
Upvotes: 0
Views: 2095
Reputation: 2086
I would use merge
here but you can also use a combination of unionByName
and groupby
to achieve that without using "merge"
import pyspark.sql.functions as F
# Create your example dataframes
data1 = [(1001, 1, 0, 1),
(1002, 0, 1, 1),
(1003, 0, 0, 1)]
columns1 = ["id", "col1", "col2", "col3"]
df1 = spark.createDataFrame(data1, columns1)
data2 = [(1001, 1, 0),
(1002, 1, 1),
(1003, 0, 1)]
columns2 = ["id", "col4", "col5"]
df2 = spark.createDataFrame(data2, columns2)
# Union by name and groupby on id and select the non-null after the aggrgation
result_df = df1.unionByName(df2, allowMissingColumns=True).groupBy("id") \
.agg(
F.coalesce(F.first("col1"), F.last("col1")).alias("col1"),
F.coalesce(F.first("col2"), F.last("col2")).alias("col2"),
F.coalesce(F.first("col3"), F.last("col3")).alias("col3"),
F.coalesce(F.first("col4"), F.last("col4")).alias("col4"),
F.coalesce(F.first("col5"), F.last("col5")).alias("col5")
)
result_df.show()
result:
+----+----+----+----+----+----+ | id|col1|col2|col3|col4|col5| +----+----+----+----+----+----+ |1001| 1| 0| 1| 1| 0| |1002| 0| 1| 1| 1| 1| |1003| 0| 0| 1| 0| 1| +----+----+----+----+----+----+
Upvotes: 2