Mandibajr
Mandibajr

Reputation: 27

Add current month cumulative sum to a dataset

df.select(
    *df.columns[:2],
    *[F.sum(F.col(i)).over(Window.orderBy('Month')).alias(i) for i in df.columns[2:8]]
)
+-------+-----------+--------+--------+--------+--------+---+--------+--------+
|Month  |month_index|QA_count|BS_count|BV_count|QT_count|B  |QB_count|BT_count|
+-------+-----------+--------+--------+--------+--------+---+--------+--------+
|2020-09|0          |3       |0       |1       |1       |2  |3       |7       |
|2020-10|1          |4       |1       |2       |2       |7  |12      |8       |
|2020-11|2          |5       |2       |3       |3       |12 |21      |9       |
|2020-12|3          |6       |3       |4       |4       |17 |30      |10      |    |
+-------+-----------+--------+--------+--------+--------+---+--------+--------+


I currently have a dataset showing cumulative sum of columns by month like the dataset above, however I will like to have the current month row added automatically even if I have no additional new data yet.my desired output would look similar to this

+-------+-----------+--------+--------+--------+--------+---+--------+--------+
|Month  |month_index|QA_count|BS_count|BV_count|QT_count|B  |QB_count|BT_count|
+-------+-----------+--------+--------+--------+--------+---+--------+--------+
|2020-09|0          |3       |0       |1       |1       |2  |3       |7       |
|2020-10|1          |4       |1       |2       |2       |7  |12      |8       |
|2020-11|2          |5       |2       |3       |3       |12 |21      |9       |
|2020-12|3          |6       |3       |4       |4       |17 |30      |10      |
|2021-01|4          |6       |3       |4       |4       |17 |30      |10      |
+-------+-----------+--------+--------+--------+--------+---+--------+--------+

P.S. However, when there is a new count in the month of 2021-01 it should automatically add that new count to the cumulative sum.

Upvotes: 0

Views: 56

Answers (1)

mck
mck

Reputation: 42392

import pyspark.sql.functions as F

df2 = df.select(
    *df.columns[:2],
    *[F.sum(F.col(i)).over(Window.orderBy('Month')).alias(i) for i in df.columns[2:8]]
)

# check if there is any new data. if there isn't, add the same row as the last row.
if df2.select('Month').orderBy(F.desc('Month')).head(1)[0] != df2.select(F.date_format(F.current_date(), 'yyyy-MM')).head(1)[0]:
    df3 = df2.union(
        df2.orderBy(F.desc('Month')).limit(1)
           .withColumn('Month', F.date_format(F.current_date(), 'yyyy-MM'))
           .withColumn('month_index', F.col('month_index')+1)
    )
else:
    df3 = df2

df3.show()
+-------+-----------+--------+--------+--------+--------+---+--------+--------+
|  Month|month_index|QA_count|BS_count|BV_count|QT_count|  B|QB_count|BT_count|
+-------+-----------+--------+--------+--------+--------+---+--------+--------+
|2020-09|          0|       3|       0|       1|       1|  2|       3|       7|
|2020-10|          1|       4|       1|       2|       2|  7|      12|       8|
|2020-11|          2|       5|       2|       3|       3| 12|      21|       9|
|2020-12|          3|       6|       3|       4|       4| 17|      30|      10|
|2021-01|          4|       6|       3|       4|       4| 17|      30|      10|
+-------+-----------+--------+--------+--------+--------+---+--------+--------+

Upvotes: 1

Related Questions