Toby Djelyinski
Toby Djelyinski

Reputation: 138

Joining on the previous month data for each month in the PySpark dataset

I have a dataset which is on a monthly basis and each month has N-number of accounts. Some months will have new accounts and some accounts will disappear after a certain month(this is randomly done).

I need to get an account's current month balance and deduct it from the previous month's balance (if this account existed in the previous month) otherwise have it as the current month's balance.

I was suggested to do a join on each month. i.e. join month1 to month2, month2 to month3, etc. But I am not exactly sure how that would go...

Here is an example dataset:

|date      |account   |balance   |
----------------------------------
|01.01.2019|1         |40        |
|01.01.2019|2         |33        |
|01.01.2019|3         |31        |
|01.02.2019|1         |32        |
|01.02.2019|2         |56        |
|01.02.2019|4         |89        |
|01.03.2019|2         |12        |
|01.03.2019|4         |35        |
|01.03.2019|5         |76        |
|01.03.2019|6         |47        |
----------------------------------

The account id is unique for each gone, current and new-coming account.

I initially used f.lag, but now that there are accounts that dissapear and come in new, the number of accounts per month is not constant, so I cannot lag. As I said I was suggested to use join. I.e. join Jan onto Feb, Feb onto March, etc.

But I am not really sure how that would go. Anyone has any ideas ?

P.S. I created this table with example of an account that stays, an account that is new and an account that is removed from later months.

The end goal is:

|date      |account   |balance   | balance_diff_with_previous_month  |
--------------------------------------------------------------------|
|01.01.2019|1         |40        |na                                |
|01.01.2019|2         |33        |na                                |
|01.01.2019|3         |31        |na                                |
|01.02.2019|1         |32        |-8                                |
|01.02.2019|2         |56        |23                                |
|01.02.2019|4         |89        |89                                |
|01.03.2019|2         |12        |-44                               |
|01.03.2019|4         |35        |-54                               |
|01.03.2019|5         |76        |76                                |
|01.03.2019|6         |47        |47                                |
--------------------------------------------------------------------|

As I said, f.lag cannot be used because the number of accounts per month is not constant and I do not control the number of accounts, therefore cannot f.lag a constant amount of rows.

Anyone has any ideas about how joining on account and/or date(current month) with date-1 (previous month)?

Thanks for reading and helping :)

Upvotes: 0

Views: 2223

Answers (3)

Nikhil Suthar
Nikhil Suthar

Reputation: 2431

>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Window
>>> df.show()
+----------+-------+-------+
|      date|account|balance|
+----------+-------+-------+
|01.01.2019|      1|     40|
|01.01.2019|      2|     33|
|01.01.2019|      3|     31|
|01.02.2019|      1|     32|
|01.02.2019|      2|     56|
|01.02.2019|      4|     89|
|01.03.2019|      2|     12|
|01.03.2019|      4|     35|
|01.03.2019|      5|     76|
|01.03.2019|      6|     47|
+----------+-------+-------+

>>> df1 = df.withColumn("date", expr("to_date(date, 'dd.MM.yyyy')"))
>>> W = Window.partitionBy("account").orderBy("date")
>>> df1.withColumn("balance_diff_with_previous_month", col("balance") - lag(col("balance"),1,0).over(W)).show()
+----------+-------+-------+--------------------------------+
|      date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|2019-01-01|      1|     40|                            40.0|
|2019-01-01|      2|     33|                            33.0|
|2019-01-01|      3|     31|                            31.0|
|2019-02-01|      1|     32|                            -8.0|
|2019-02-01|      2|     56|                            23.0|
|2019-02-01|      4|     89|                            89.0|
|2019-03-01|      2|     12|                           -44.0|
|2019-03-01|      4|     35|                           -54.0|
|2019-03-01|      5|     76|                            76.0|
|2019-03-01|      6|     47|                            47.0|
+----------+-------+-------+--------------------------------+

Upvotes: 1

Ranga Vure
Ranga Vure

Reputation: 1932

alternate solution using joins ....

df = spark.createDataFrame([
            ("01.01.2019", 1, 40),("01.01.2019", 2, 33),("01.01.2019", 3, 31),
            ("01.02.2019", 1, 32), ("01.02.2019", 2, 56),("01.02.2019", 4, 89),
            ("01.03.2019", 2, 12),("01.03.2019", 4, 35),("01.03.2019", 5, 76),("01.03.2019", 6, 47)],
            ["date","account","balance"])

df.alias("current").join(
    df.alias("previous"),
    [F.to_date(F.col("previous.date"), "dd.MM.yyyy") == F.to_date(F.add_months(F.to_date(F.col("current.date"), "dd.MM.yyyy"),-1),"dd.MM.yyyy"), F.col("previous.account") == F.col("current.account")],
    "left"
).select(
    F.col("current.date").alias("date"),
    F.coalesce("current.account", "previous.account").alias("account"),
    F.col("current.balance").alias("balance"),
    (F.col("current.balance") - F.coalesce(F.col("previous.balance"), F.lit(0))).alias("balance_diff_with_previous_month")
).orderBy("date","account").show()

which results

+----------+-------+-------+--------------------------------+
|      date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|01.01.2019|      1|     40|                              40|
|01.01.2019|      2|     33|                              33|
|01.01.2019|      3|     31|                              31|
|01.02.2019|      1|     32|                              -8|
|01.02.2019|      2|     56|                              23|
|01.02.2019|      4|     89|                              89|
|01.03.2019|      2|     12|                             -44|
|01.03.2019|      4|     35|                             -54|
|01.03.2019|      5|     76|                              76|
|01.03.2019|      6|     47|                              47|
+----------+-------+-------+--------------------------------+

Upvotes: 2

LaSul
LaSul

Reputation: 2411

F.lag works perfectly for what you want if you partition by account and

partition = Window.partitionBy("account") \
                  .orderBy(F.col("date").cast("timestamp").cast("long"))

previousAmount = data.withColumn("balance_diff_with_previous_month", F.lag("balance").over(partition))
                     .show(10, False)

Upvotes: 1

Related Questions