Reputation: 23
I am looking for a way to add a column from one pyspark dataframe, lets say this is DF1:
column1 |
---|
123 |
234 |
345 |
to another pyspark dataframe, which will have any number of columns itself but not column1
, DF2:
column2 | column3 | column4 |
---|---|---|
000 | data | some1 |
253774 | etc | etc |
1096 | null | more |
999 | other | null |
The caveat here is, I would like to avoid using Pandas, and I would like to avoid pulling all of the data into a single partition if possible. This will be up to Terabytes of data on the DF2 side, it will be running distributed on an EMR cluster.
DF1 will be a fixed set of numbers, which could be more or less than the row count of DF2. If DF2 has more rows, DF1 values should be repeated (think cycle). If DF1 has more rows, we don't exceed the rows in DF2 we just attach a value to each row (it doesn't matter if we include all of the rows from DF1.
If these requirements seem strange, it is because the value itself is important in DF1 and we need to use them in DF2, but it doesn't matter which value from DF1 is attached to each DF2 row (we just don't want to repeat the same value over and over, though some duplicates are fine)
What I've Tried:
What I am hoping to find:
I am looking for a way to simply cycle over the values from DF1 and apply them to each row on DF2, but doing it with native Pyspark if possible.
In the end an example would look like this:
column1 | column2 | column3 | column4 |
---|---|---|---|
123 | 000 | data | some1 |
234 | 253774 | etc | etc |
345 | 1096 | null | more |
123 | 999 | other | null |
Upvotes: 0
Views: 1527
Reputation: 5940
The combination of window functions row_number
and ntile
might be the answer:
Apply a row_number
on DF1 to get all records enumerated as the new column id
Get the count of records in DF1 and store it as df1_count
Apply ntile(df1_count)
on DF2 as the new column id
. Ntile will 'split' your DF2 rows into n as much as possible equal groups
Join DF1 and DF2 on a new generated column id
to combine both dataframes
Alternatively, instead of ntile(n)
, DF2 can also get a row_number()
based column id
which then can be used to calculate mod:
df.withColumn("id_mod", col("id") % lit(df1_count))
and then that id_mod
to be joined with DF1 using DF1.id
Upvotes: 1