Reputation: 498
I read several other posts about combining dataframes but my issue is a little different as I don't know the dataframe names in advance. Here's why:
I have a function that produces a data frame showing first purchases and subsequent purchases:
renew(df, 2).show()
+--------------+----------+----------+---------+----------+---------+
|First_Purchase|Renewal_Mo|second_buy|third_buy|fourth_buy|fifth_buy|
+--------------+----------+----------+---------+----------+---------+
|2 |2 |20 |4 |3 |2 |
|2 |48 |2 |0 |0 |0 |
|2 |24 |2 |0 |0 |0 |
|2 |12 |12 |2 |0 |0 |
|2 |6 |3 |1 |0 |0 |
+--------------+----------+----------+---------+----------+---------+
I can change the input to produce dataframe where First_Purchase = 12 (or 3, or 6 etc) and produce another dataframe:
renew(df, 12).show()
+--------------+----------+----------+---------+----------+---------+
|First_Purchase|Renewal_Mo|second_buy|third_buy|fourth_buy|fifth_buy|
+--------------+----------+----------+---------+----------+---------+
|12 |2 |12 |1 |0 |0 |
|12 |1 |9 |1 |0 |0 |
|12 |48 |1 |0 |0 |0 |
|12 |24 |7 |0 |0 |0 |
|12 |12 |64 |4 |0 |0 |
|12 |6 |6 |1 |0 |0 |
|12 |3 |4 |0 |0 |0 |
+--------------+----------+----------+---------+----------+---------+
I need to loop though various First_Purchase values and create one dataframe. e.g.,
month = [2, 12]
for x in month
renew_modified(all_renewals_cleaned_md, x)
The code above will loop through but overwrite the previous dataframe leaving me with only the last dataframe rather than append them.
If I could export each dataframe in the loop to a new and uniquely named dataframe I could concatenate them like this:
purchases_combined = [df1,df12,df3,df6]
But I haven't been able to come up with the code to loop through, export the dataframe to unique names dynamically, and then combine them so they would look like this:
+--------------+----------+----------+---------+----------+---------+
|First_Purchase|Renewal_Mo|second_buy|third_buy|fourth_buy|fifth_buy|
+--------------+----------+----------+---------+----------+---------+
|2 |2 |20 |4 |3 |2 |
|2 |48 |2 |0 |0 |0 |
|2 |24 |2 |0 |0 |0 |
|2 |12 |12 |2 |0 |0 |
|2 |6 |3 |1 |0 |0 |
|12 |2 |12 |1 |0 |0 |
|12 |1 |9 |1 |0 |0 |
|12 |48 |1 |0 |0 |0 |
|12 |24 |7 |0 |0 |0 |
|12 |12 |64 |4 |0 |0 |
|12 |6 |6 |1 |0 |0 |
|12 |3 |4 |0 |0 |0 |
+--------------+----------+----------+---------+----------+---------+
Does anyone have any suggestions?
Upvotes: 0
Views: 1047
Reputation: 655
What You Are Failing to Do is creating a loop which will take dataframe
created so far and concatenate
current loop value(which is a dataframe)
All you need is to create an
empty
dataframe, -->
run a loop of available dataframes -->
keep concatenating
to the empty dataframe created in step 1.
Please see example below. (btw, this solution can be scaled by calling this function in parallel threads and using spark's FAIR job scheduling but that's out of scope for this question)
Trivial example below :
%pyspark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
def concat_df(prev_df,this_df):
prev_df.createOrReplaceTempView("prev_df")
this_df.createOrReplaceTempView("this_df")
# UNION dataframe created so far with current loop value
return spark.sql(" SELECT * from prev_df UNION SELECT * FROM this_df ")
# schema needed for creating 'Empty' dataframe as first step
df_schema = StructType().add(StructField("name", StringType(), True)).add(StructField("id", IntegerType(), True))
# This empty df will be our Final result
prev_empty_df = spark.createDataFrame([], df_schema)
df1 = spark.createDataFrame([("John",1),("Sam",2)], schema=["name","age"])
df2 = spark.createDataFrame([("Adam",30),("Keaunu",25)], schema=["name","age"])
df3 = spark.createDataFrame([("David",100),("Xavier",250)], schema=["name","age"])
print("Before Concat df1 ->")
df1.show(20),
print("Before Concat df2 ->")
df2.show(20),
print("Before Concat df3 ->")
df3.show(20),
for dframe in [ df1, df2, df3 ]:
prev_empty_df = concat_df(prev_empty_df,dframe)
print("FINAL DF")
prev_empty_df.show(20,False)
And Result
Before Concat df1 ->
+----+---+
|name|age|
+----+---+
|John| 1|
| Sam| 2|
+----+---+
Before Concat df2 ->
+------+---+
| name|age|
+------+---+
| Adam| 30|
|Keaunu| 25|
+------+---+
Before Concat df3 ->
+------+---+
| name|age|
+------+---+
| David|100|
|Xavier|250|
+------+---+
FINAL DF
+------+---+
|name |id |
+------+---+
|John |1 |
|Sam |2 |
|Xavier|250|
|David |100|
|Keaunu|25 |
|Adam |30 |
+------+---+
Upvotes: 1
Reputation: 498
I figured it out:
from pyspark.sql import DataFrame
result = []
month_list = [12, 2]
for x in month_list:
df2 = renew(df, x)
result.append(df2)
df3 = reduce(DataFrame.union, result)
df3.show()
produces the dataframe:
+--------------+----------+----------+---------+----------+---------+
|First_Purchase|Renewal_Mo|second_buy|third_buy|fourth_buy|fifth_buy|
+--------------+----------+----------+---------+----------+---------+
| 12| 2| 12| 1| 0| 0|
| 12| 1| 9| 1| 0| 0|
| 12| 48| 1| 0| 0| 0|
| 12| 24| 7| 0| 0| 0|
| 12| 12| 64| 4| 0| 0|
| 12| 6| 6| 1| 0| 0|
| 12| 3| 4| 0| 0| 0|
| 2| 2| 20| 4| 3| 2|
| 2| 48| 2| 0| 0| 0|
| 2| 24| 2| 0| 0| 0|
| 2| 12| 12| 2| 0| 0|
| 2| 6| 3| 1| 0| 0|
+--------------+----------+----------+---------+----------+---------+
Upvotes: 0