Apoorv Agarwal
Apoorv Agarwal

Reputation: 13

Pyspark: 'For' loops to add rows to a dataframe

I am trying to use a for loop to add new rows to a dataframe. So the input is:

ColA  ColNum  ColB  ColB_lag1  ColB_lag2
Xyz     25    123      234        345
Abc     40    456      567        678

And the output I want is this:

ColA  ColNum  ColB  ColB_lag1  ColB_lag2
 Xyz    25    123      234       345
 Xyz    26    789      123       234
 Abc    40    456      567       678
 Abc    41    890      456       567

So, the code I have is this:

df = df.withColumn("ColNum", (df.ColNum + 1).cast(IntegerType())) \
       .withColumn("ColB_lag2", df.ColB_lag1)
       .withColumn("ColB_lag1", df.ColB)
       .withColumn("ColB", someFunc())

The code works fine when I have to add only one row, but breaks when I have to add multiple rows in a loop. So I used a For loop to accomplish it. I filter for the latest row at the beginning of a loop then run the logic above to calculate the values for the columns. Then append the new row to the dataset which is again used at the top of the loop. The output ends up looking something like this:

ColA  ColNum  ColB  ColB_lag1  ColB_lag2
 Xyz    25    123      234       345
 Xyz    25    789      123
 Xyz    26    789      123
 Abc    40    456      567       678
 Abc    40    890      456
 Abc    41    890      456

The question is: Do 'For' loops in PySpark break down due to parallelization or am I chaining too many functions in the for loop(or the order of functions in the loop) that is causing this erratic behavior?

Happy to share more details if I have missed out on any key point here.

Edit 1: The For loop is as below:

num_months = 5
df_final = sc.read.csv(input_path, header='true').createOrReplaceTempView("df_final")

for i in range(num_months):
    df = sc.sql("""
        SELECT *
        FROM df_final mrd
        INNER JOIN
            (SELECT ColA AS ColA_tmp, MAX(fh_effdt) AS max_fh_effdt
            FROM df_final
            GROUP BY ColA) grouped_mrd
        ON mrd.ColA = grouped_mrd.ColA_tmp
        AND mrd.fh_effdt = grouped_mrd.max_fh_effdt
        """)
    df = df.drop(df.ColA_tmp).drop(df.max_fh_effdt).drop(df.ColB_lag2)
    df_tmp = df.withColumn("ColNum", (df.wala + 1).cast(IntegerType())) \
               .withColumn("ColB_lag2", df.ColB_lag1) \
               .withColumn("ColB_lag1", df.ColB) \
               .withColumn("ColB", someFunc())
    df_final = df_final.union(df_tmp)

df_final.persist()
df_final.coalesce(1).write.csv(output_path + scenario_name+"_df_final", mode='overwrite', header='true')

Solution: The issue was with the union. Since I was dropping the columns and recalculating them, spark adds those columns to the end and the 'Union' does a union by column position and not name. This was what creating an issues in the consequent loops as the data shifted by a few columns for the new rows. The solution was to literally select all the columns and re-order them before doing the union. The snippet above is simplified where I can do it without dropping ColB_lag2. The actual code has another step in between where I refresh some values from another dataframe join and those columns need to be dropped before bringing in from the new dataframe.

Upvotes: 1

Views: 14907

Answers (1)

ernest_k
ernest_k

Reputation: 45319

Your problem is that you're creating the temporary view on a version of the data frame (original data from csv data source), and expecting it to reflect changes made to the df_final data frame variable.

The temporary view df_final does not contain the data made to the data frame df_final as the loop runs. Data frames are immutable. One way to solve this is to replace the temporary view in the loop too:

# the top part of your loop...
df_final = df_final.union(df_tmp)
df_final.createOrReplaceTempView("df_final")

Upvotes: 1

Related Questions