Reputation: 138
I have a dataset which is on a monthly basis and each month has N-number of accounts. Some months will have new accounts and some accounts will disappear after a certain month(this is randomly done).
I need to get an account's current month balance and deduct it from the previous month's balance (if this account existed in the previous month) otherwise have it as the current month's balance.
I was suggested to do a join on each month. i.e. join month1 to month2, month2 to month3, etc. But I am not exactly sure how that would go...
Here is an example dataset:
|date |account |balance |
----------------------------------
|01.01.2019|1 |40 |
|01.01.2019|2 |33 |
|01.01.2019|3 |31 |
|01.02.2019|1 |32 |
|01.02.2019|2 |56 |
|01.02.2019|4 |89 |
|01.03.2019|2 |12 |
|01.03.2019|4 |35 |
|01.03.2019|5 |76 |
|01.03.2019|6 |47 |
----------------------------------
The account id is unique for each gone, current and new-coming account.
I initially used f.lag, but now that there are accounts that dissapear and come in new, the number of accounts per month is not constant, so I cannot lag. As I said I was suggested to use join. I.e. join Jan onto Feb, Feb onto March, etc.
But I am not really sure how that would go. Anyone has any ideas ?
P.S. I created this table with example of an account that stays, an account that is new and an account that is removed from later months.
The end goal is:
|date |account |balance | balance_diff_with_previous_month |
--------------------------------------------------------------------|
|01.01.2019|1 |40 |na |
|01.01.2019|2 |33 |na |
|01.01.2019|3 |31 |na |
|01.02.2019|1 |32 |-8 |
|01.02.2019|2 |56 |23 |
|01.02.2019|4 |89 |89 |
|01.03.2019|2 |12 |-44 |
|01.03.2019|4 |35 |-54 |
|01.03.2019|5 |76 |76 |
|01.03.2019|6 |47 |47 |
--------------------------------------------------------------------|
As I said, f.lag cannot be used because the number of accounts per month is not constant and I do not control the number of accounts, therefore cannot f.lag a constant amount of rows.
Anyone has any ideas about how joining on account and/or date(current month) with date-1 (previous month)?
Thanks for reading and helping :)
Upvotes: 0
Views: 2223
Reputation: 2431
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Window
>>> df.show()
+----------+-------+-------+
| date|account|balance|
+----------+-------+-------+
|01.01.2019| 1| 40|
|01.01.2019| 2| 33|
|01.01.2019| 3| 31|
|01.02.2019| 1| 32|
|01.02.2019| 2| 56|
|01.02.2019| 4| 89|
|01.03.2019| 2| 12|
|01.03.2019| 4| 35|
|01.03.2019| 5| 76|
|01.03.2019| 6| 47|
+----------+-------+-------+
>>> df1 = df.withColumn("date", expr("to_date(date, 'dd.MM.yyyy')"))
>>> W = Window.partitionBy("account").orderBy("date")
>>> df1.withColumn("balance_diff_with_previous_month", col("balance") - lag(col("balance"),1,0).over(W)).show()
+----------+-------+-------+--------------------------------+
| date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|2019-01-01| 1| 40| 40.0|
|2019-01-01| 2| 33| 33.0|
|2019-01-01| 3| 31| 31.0|
|2019-02-01| 1| 32| -8.0|
|2019-02-01| 2| 56| 23.0|
|2019-02-01| 4| 89| 89.0|
|2019-03-01| 2| 12| -44.0|
|2019-03-01| 4| 35| -54.0|
|2019-03-01| 5| 76| 76.0|
|2019-03-01| 6| 47| 47.0|
+----------+-------+-------+--------------------------------+
Upvotes: 1
Reputation: 1932
alternate solution using joins ....
df = spark.createDataFrame([
("01.01.2019", 1, 40),("01.01.2019", 2, 33),("01.01.2019", 3, 31),
("01.02.2019", 1, 32), ("01.02.2019", 2, 56),("01.02.2019", 4, 89),
("01.03.2019", 2, 12),("01.03.2019", 4, 35),("01.03.2019", 5, 76),("01.03.2019", 6, 47)],
["date","account","balance"])
df.alias("current").join(
df.alias("previous"),
[F.to_date(F.col("previous.date"), "dd.MM.yyyy") == F.to_date(F.add_months(F.to_date(F.col("current.date"), "dd.MM.yyyy"),-1),"dd.MM.yyyy"), F.col("previous.account") == F.col("current.account")],
"left"
).select(
F.col("current.date").alias("date"),
F.coalesce("current.account", "previous.account").alias("account"),
F.col("current.balance").alias("balance"),
(F.col("current.balance") - F.coalesce(F.col("previous.balance"), F.lit(0))).alias("balance_diff_with_previous_month")
).orderBy("date","account").show()
which results
+----------+-------+-------+--------------------------------+
| date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|01.01.2019| 1| 40| 40|
|01.01.2019| 2| 33| 33|
|01.01.2019| 3| 31| 31|
|01.02.2019| 1| 32| -8|
|01.02.2019| 2| 56| 23|
|01.02.2019| 4| 89| 89|
|01.03.2019| 2| 12| -44|
|01.03.2019| 4| 35| -54|
|01.03.2019| 5| 76| 76|
|01.03.2019| 6| 47| 47|
+----------+-------+-------+--------------------------------+
Upvotes: 2
Reputation: 2411
F.lag
works perfectly for what you want if you partition by account
and
partition = Window.partitionBy("account") \
.orderBy(F.col("date").cast("timestamp").cast("long"))
previousAmount = data.withColumn("balance_diff_with_previous_month", F.lag("balance").over(partition))
.show(10, False)
Upvotes: 1