Reputation: 13
I am trying to use a for loop to add new rows to a dataframe. So the input is:
ColA ColNum ColB ColB_lag1 ColB_lag2
Xyz 25 123 234 345
Abc 40 456 567 678
And the output I want is this:
ColA ColNum ColB ColB_lag1 ColB_lag2
Xyz 25 123 234 345
Xyz 26 789 123 234
Abc 40 456 567 678
Abc 41 890 456 567
So, the code I have is this:
df = df.withColumn("ColNum", (df.ColNum + 1).cast(IntegerType())) \
.withColumn("ColB_lag2", df.ColB_lag1)
.withColumn("ColB_lag1", df.ColB)
.withColumn("ColB", someFunc())
The code works fine when I have to add only one row, but breaks when I have to add multiple rows in a loop. So I used a For loop to accomplish it. I filter for the latest row at the beginning of a loop then run the logic above to calculate the values for the columns. Then append the new row to the dataset which is again used at the top of the loop. The output ends up looking something like this:
ColA ColNum ColB ColB_lag1 ColB_lag2
Xyz 25 123 234 345
Xyz 25 789 123
Xyz 26 789 123
Abc 40 456 567 678
Abc 40 890 456
Abc 41 890 456
The question is: Do 'For' loops in PySpark break down due to parallelization or am I chaining too many functions in the for loop(or the order of functions in the loop) that is causing this erratic behavior?
Happy to share more details if I have missed out on any key point here.
Edit 1: The For loop is as below:
num_months = 5
df_final = sc.read.csv(input_path, header='true').createOrReplaceTempView("df_final")
for i in range(num_months):
df = sc.sql("""
SELECT *
FROM df_final mrd
INNER JOIN
(SELECT ColA AS ColA_tmp, MAX(fh_effdt) AS max_fh_effdt
FROM df_final
GROUP BY ColA) grouped_mrd
ON mrd.ColA = grouped_mrd.ColA_tmp
AND mrd.fh_effdt = grouped_mrd.max_fh_effdt
""")
df = df.drop(df.ColA_tmp).drop(df.max_fh_effdt).drop(df.ColB_lag2)
df_tmp = df.withColumn("ColNum", (df.wala + 1).cast(IntegerType())) \
.withColumn("ColB_lag2", df.ColB_lag1) \
.withColumn("ColB_lag1", df.ColB) \
.withColumn("ColB", someFunc())
df_final = df_final.union(df_tmp)
df_final.persist()
df_final.coalesce(1).write.csv(output_path + scenario_name+"_df_final", mode='overwrite', header='true')
Solution: The issue was with the union. Since I was dropping the columns and recalculating them, spark adds those columns to the end and the 'Union' does a union by column position and not name. This was what creating an issues in the consequent loops as the data shifted by a few columns for the new rows. The solution was to literally select all the columns and re-order them before doing the union. The snippet above is simplified where I can do it without dropping ColB_lag2. The actual code has another step in between where I refresh some values from another dataframe join and those columns need to be dropped before bringing in from the new dataframe.
Upvotes: 1
Views: 14907
Reputation: 45319
Your problem is that you're creating the temporary view on a version of the data frame (original data from csv data source), and expecting it to reflect changes made to the df_final
data frame variable.
The temporary view df_final
does not contain the data made to the data frame df_final
as the loop runs. Data frames are immutable. One way to solve this is to replace the temporary view in the loop too:
# the top part of your loop...
df_final = df_final.union(df_tmp)
df_final.createOrReplaceTempView("df_final")
Upvotes: 1