Reputation: 692
I have a pyspark dataframe as follows.
+-----+----+------+------+-----+-----+
| id|year|period| val_1|val_2|val_3|
+-----+----+------+------+-----+-----+
|94734|2020| 5|160000| 0| 0|
|39066|2011| 1| 20000| 0| 0|
|66198|2013| 1| 22000| 0| 0|
|89691|2015| 5|150000| 0| 0|
|11653|2010| 1| 20000| 0| 0|
+-----+----+------+------+-----+-----+
I am trying to expand the above dataframe as the below dataframe by giving a year period and adding missing year periods as well.
+-----+----+-----------+------+-----+-----+-----+
| id|year|year_period|period|val_1|val_2|val_3|
+-----+----+-----------+------+-----+-----+-----+
|94734|2020| 2020-2021| 1|32000| 0| 0|
|94734|2021| 2021-2022| 1|32000| 0| 0|
|94734|2022| 2022-2023| 1|32000| 0| 0|
|94734|2023| 2023-2024| 1|32000| 0| 0|
|94734|2024| 2024-2025| 1|32000| 0| 0|
|39066|2011| 2011-2012| 1|20000| 0| 0|
|66198|2013| 2013-2014| 1|22000| 0| 0|
|89691|2015| 2015-2016| 1|30000| 0| 0|
|89691|2016| 2016-2017| 1|30000| 0| 0|
|89691|2017| 2017-2018| 1|30000| 0| 0|
|89691|2018| 2018-2019| 1|30000| 0| 0|
|89691|2019| 2019-2020| 1|30000| 0| 0|
|11653|2010| 2010-2011| 1|20000| 0| 0|
| |2012| 2012-2013| 1| 0| 0| 0|
| |2014| 2014-2015| 1| 0| 0| 0|
+-----+----+-----------+------+-----+-----+-----+
I started trying the below code.
import pyspark.sql.functions as F
cond1 = F.col("period") > 1
new_df = df.withColumn('period', F.expr('explode(array_repeat(period,int(period)))'))
new_df = new_df.withColumn("val_1", F.when(cond1, F.col("val_1")/F.col("period")).otherwise(F.col("val_1")))
new_df = new_df.withColumn("val_2", F.when(cond1, F.col("val_2")/F.col("period")).otherwise(F.col("val_2")))
new_df = new_df.withColumn("val_3", F.when(cond1, F.col("val_3")/F.col("period")).otherwise(F.col("val_3")))
new_df.show()
+-----+----+------+-------+-----+-----+
| id|year|period| val_1|val_2|val_3|
+-----+----+------+-------+-----+-----+
|94734|2020| 5|32000.0| 0| 0|
|94734|2020| 5|32000.0| 0| 0|
|94734|2020| 5|32000.0| 0| 0|
|94734|2020| 5|32000.0| 0| 0|
|94734|2020| 5|32000.0| 0| 0|
|39066|2011| 1|20000.0| 0| 0|
|66198|2013| 1|22000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|89691|2015| 5|30000.0| 0| 0|
|11653|2010| 1|20000.0| 0| 0|
+-----+----+------+-------+-----+-----+
But I do not know how to continue to get the expected results. If someone can help, would be great. Thank you.
Upvotes: 2
Views: 183
Reputation: 9247
I believe you already were at a good point towards your solution, here is the code I would write to reach the desired output
import pyspark.sql.functions as F
from pyspark.sql.window import Window
new_df = (
df.withColumn("period", F.expr("explode(array_repeat(period,int(period)))"))
.withColumn(
"year",
F.col("year")
+ F.row_number().over(Window.partitionBy("id").orderBy(F.lit(1)))
- 1,
)
.withColumn(
"val_1",
F.when(cond1, F.col("val_1") / F.col("period")).otherwise(F.col("val_1")),
)
.withColumn(
"val_2",
F.when(cond1, F.col("val_2") / F.col("period")).otherwise(F.col("val_2")),
)
.withColumn(
"val_3",
F.when(cond1, F.col("val_3") / F.col("period")).otherwise(F.col("val_3")),
)
.withColumn("period", F.lit(1))
.withColumn("year_period", F.concat_ws("-", F.col("year"), F.col("year") + 1))
)
new_df.show()
+-----+----+------+-------+-----+-----+-----------+
| id|year|period| val_1|val_2|val_3|year_period|
+-----+----+------+-------+-----+-----+-----------+
|11653|2010| 1|20000.0| 0.0| 0.0| 2010-2011|
|39066|2011| 1|20000.0| 0.0| 0.0| 2011-2012|
|66198|2013| 1|22000.0| 0.0| 0.0| 2013-2014|
|89691|2015| 1|30000.0| 0.0| 0.0| 2015-2016|
|89691|2016| 1|30000.0| 0.0| 0.0| 2016-2017|
|89691|2017| 1|30000.0| 0.0| 0.0| 2017-2018|
|89691|2018| 1|30000.0| 0.0| 0.0| 2018-2019|
|89691|2019| 1|30000.0| 0.0| 0.0| 2019-2020|
|94734|2020| 1|32000.0| 0.0| 0.0| 2020-2021|
|94734|2021| 1|32000.0| 0.0| 0.0| 2021-2022|
|94734|2022| 1|32000.0| 0.0| 0.0| 2022-2023|
|94734|2023| 1|32000.0| 0.0| 0.0| 2023-2024|
|94734|2024| 1|32000.0| 0.0| 0.0| 2024-2025|
+-----+----+------+-------+-----+-----+-----------+
Upvotes: 2