xcen
xcen

Reputation: 692

Pyspark: Expand pyspark dataframe adding missing periods

I have a pyspark dataframe as follows.

+-----+----+------+------+-----+-----+
|   id|year|period| val_1|val_2|val_3|
+-----+----+------+------+-----+-----+
|94734|2020|     5|160000|    0|    0|
|39066|2011|     1| 20000|    0|    0|
|66198|2013|     1| 22000|    0|    0|
|89691|2015|     5|150000|    0|    0|
|11653|2010|     1| 20000|    0|    0|
+-----+----+------+------+-----+-----+

I am trying to expand the above dataframe as the below dataframe by giving a year period and adding missing year periods as well.

+-----+----+-----------+------+-----+-----+-----+
|   id|year|year_period|period|val_1|val_2|val_3|
+-----+----+-----------+------+-----+-----+-----+
|94734|2020|  2020-2021|     1|32000|    0|    0|
|94734|2021|  2021-2022|     1|32000|    0|    0|
|94734|2022|  2022-2023|     1|32000|    0|    0|
|94734|2023|  2023-2024|     1|32000|    0|    0|
|94734|2024|  2024-2025|     1|32000|    0|    0|
|39066|2011|  2011-2012|     1|20000|    0|    0|
|66198|2013|  2013-2014|     1|22000|    0|    0|
|89691|2015|  2015-2016|     1|30000|    0|    0|
|89691|2016|  2016-2017|     1|30000|    0|    0|
|89691|2017|  2017-2018|     1|30000|    0|    0|
|89691|2018|  2018-2019|     1|30000|    0|    0|
|89691|2019|  2019-2020|     1|30000|    0|    0|
|11653|2010|  2010-2011|     1|20000|    0|    0|
|     |2012|  2012-2013|     1|    0|    0|    0|
|     |2014|  2014-2015|     1|    0|    0|    0|
+-----+----+-----------+------+-----+-----+-----+

I started trying the below code.

import pyspark.sql.functions as F

cond1 = F.col("period") > 1
new_df = df.withColumn('period', F.expr('explode(array_repeat(period,int(period)))'))
new_df = new_df.withColumn("val_1", F.when(cond1, F.col("val_1")/F.col("period")).otherwise(F.col("val_1")))
new_df = new_df.withColumn("val_2", F.when(cond1, F.col("val_2")/F.col("period")).otherwise(F.col("val_2")))
new_df = new_df.withColumn("val_3", F.when(cond1, F.col("val_3")/F.col("period")).otherwise(F.col("val_3")))
new_df.show()

+-----+----+------+-------+-----+-----+
|   id|year|period|  val_1|val_2|val_3|
+-----+----+------+-------+-----+-----+
|94734|2020|     5|32000.0|    0|    0|
|94734|2020|     5|32000.0|    0|    0|
|94734|2020|     5|32000.0|    0|    0|
|94734|2020|     5|32000.0|    0|    0|
|94734|2020|     5|32000.0|    0|    0|
|39066|2011|     1|20000.0|    0|    0|
|66198|2013|     1|22000.0|    0|    0|
|89691|2015|     5|30000.0|    0|    0|
|89691|2015|     5|30000.0|    0|    0|
|89691|2015|     5|30000.0|    0|    0|
|89691|2015|     5|30000.0|    0|    0|
|89691|2015|     5|30000.0|    0|    0|
|11653|2010|     1|20000.0|    0|    0|
+-----+----+------+-------+-----+-----+

But I do not know how to continue to get the expected results. If someone can help, would be great. Thank you.

Upvotes: 2

Views: 183

Answers (1)

Ric S
Ric S

Reputation: 9247

I believe you already were at a good point towards your solution, here is the code I would write to reach the desired output

import pyspark.sql.functions as F
from pyspark.sql.window import Window

new_df = (
    df.withColumn("period", F.expr("explode(array_repeat(period,int(period)))"))
    .withColumn(
        "year",
        F.col("year")
        + F.row_number().over(Window.partitionBy("id").orderBy(F.lit(1)))
        - 1,
    )
    .withColumn(
        "val_1",
        F.when(cond1, F.col("val_1") / F.col("period")).otherwise(F.col("val_1")),
    )
    .withColumn(
        "val_2",
        F.when(cond1, F.col("val_2") / F.col("period")).otherwise(F.col("val_2")),
    )
    .withColumn(
        "val_3",
        F.when(cond1, F.col("val_3") / F.col("period")).otherwise(F.col("val_3")),
    )
    .withColumn("period", F.lit(1))
    .withColumn("year_period", F.concat_ws("-", F.col("year"), F.col("year") + 1))
)

new_df.show()
+-----+----+------+-------+-----+-----+-----------+
|   id|year|period|  val_1|val_2|val_3|year_period|
+-----+----+------+-------+-----+-----+-----------+
|11653|2010|     1|20000.0|  0.0|  0.0|  2010-2011|
|39066|2011|     1|20000.0|  0.0|  0.0|  2011-2012|
|66198|2013|     1|22000.0|  0.0|  0.0|  2013-2014|
|89691|2015|     1|30000.0|  0.0|  0.0|  2015-2016|
|89691|2016|     1|30000.0|  0.0|  0.0|  2016-2017|
|89691|2017|     1|30000.0|  0.0|  0.0|  2017-2018|
|89691|2018|     1|30000.0|  0.0|  0.0|  2018-2019|
|89691|2019|     1|30000.0|  0.0|  0.0|  2019-2020|
|94734|2020|     1|32000.0|  0.0|  0.0|  2020-2021|
|94734|2021|     1|32000.0|  0.0|  0.0|  2021-2022|
|94734|2022|     1|32000.0|  0.0|  0.0|  2022-2023|
|94734|2023|     1|32000.0|  0.0|  0.0|  2023-2024|
|94734|2024|     1|32000.0|  0.0|  0.0|  2024-2025|
+-----+----+------+-------+-----+-----+-----------+

Upvotes: 2

Related Questions