Reputation: 75
I have a dataframe with some events, along with the date (if) an action was taken, along with the agent that made the action, as shown below:
create_date agent action_taken action_date
0 2021-10-12 agent1 1 2021-10-16
1 2021-10-16 agent1 1 2021-10-19
2 2021-10-19 agent1 1 2021-10-19
3 2021-10-24 agent1 1 2021-10-25
4 2021-10-12 agent2 1 2021-12-12
5 2021-10-23 agent2 1 2021-10-23
6 2021-11-10 agent2 1 2021-11-19
7 2021-11-11 agent2 0 None
8 2021-11-12 agent2 1 2021-11-12
9 2021-11-24 agent2 1 2021-12-14
Each row is an event. The create_date
is the date on which an event was created. Each event is assigned to an agent, who may/may not take an action on it (denoted in the action_taken
column). Only if the agent takes an action, the action_taken
column is populated with a 1
, and the action_date
column is populated with the date when the action was taken (when the row/event is created, no action taken would be taken yet, which would populate 0
under action_taken
and None
under action_date
) . The action_taken
and action_date
columns are updated retroactively (not at time of event creation). that information is not available . I would like the agg
column in each row to be populated with the number of actions that had been taken by the agent assigned to the event, in the 30 days prior to the the create_date
of each event.
The use case is that I want to create a training set to predict whether a given agent will perform an action based on historical trends. The prediction is to be generated at the time of event creation, so if an action (for any past event) is taken after the creation of the new event, this information would not have been available at the time when the event was created/prediction was made, and therefore should not be included in the aggregation/training set so as to avoid temporal leakage.
Essentially, I am looking for guidance on creating a rolling aggregation (count/sum) of the action_taken
column looking back 30 days from the create_date
, grouped on the agent
column – however, if the action_date
occurred after the create_date in a particular row, it should not be included in the aggregate. For instance, for an event created on 2021-10-23
, the aggregate cannot include any actions that occurred after the preceding 30 days of the create_date
, so in the above example, the action on row 5 (index 4) for agent2
would not be included because the action_date
is 2021-12-12
.
The expected solution would be:
create_date agent action_taken action_date agg
0 2021-10-12 agent1 1 2021-10-16 0
1 2021-10-16 agent1 1 2021-10-19 0
2 2021-10-19 agent1 1 2021-10-19 1
3 2021-10-24 agent1 1 2021-10-25 3
4 2021-10-12 agent2 1 2021-12-12 0
5 2021-10-23 agent2 1 2021-10-23 0
6 2021-11-10 agent2 1 2021-11-19 1
7 2021-11-11 agent2 0 None 1
8 2021-11-12 agent2 1 2021-11-12 1
9 2021-11-24 agent2 1 2021-12-14 2
To recreate the dataframe:
pd.DataFrame{'create_date': ['2021-10-12', '2021-10-16', '2021-10-19',
'2021-10-24', '2021-10-12', '2021-10-23',
'2021-11-10', '2021-11-11', '2021-11-12', '2021-11-24'
],
'agent': ['agent1', 'agent1', 'agent1',
'agent1', 'agent2', 'agent2',
'agent2', 'agent2', 'agent2', 'agent2'
],
'action_taken': [1, 1, 1,
1, 1, 1,
1, 0, 1, 1
],
'action_date': ['2021-10-16', '2021-10-19', '2021-10-19',
'2021-10-25', '2021-12-12', '2021-10-23',
'2021-11-19', None, '2021-11-12', '2021-12-14'
],
}
Would prefer help using python/pandas, but pyspark solutions are also equally appreciated.
Thanks.
Upvotes: 1
Views: 87
Reputation: 26676
w=Window.partitionBy('agent').orderBy('create_date')
df = (
#Coerce dates to date
df.withColumn('create_date', to_date('create_date')).withColumn('action_date', to_date('action_date'))
#Create new column with a list of action_dates preceding the current row but include the current rows date
.withColumn('recentaction', collect_list('action_date').over(w))
#Compute the last 3o days
.withColumn('last30days',date_sub(col("create_date"),30))
#Find if the dates in the list are between (create_date-30 days) and the day before create_date
.withColumn('counter', expr("transform(recentaction, x -> cast(x BETWEEN last30days and date_sub(create_date,1) as integer))"))
# sum up the outcome from above
.withColumn('agg', expr("reduce(counter,0, (x,i)-> x+i)")))
df.select('create_date', 'agent', 'action_taken', 'action_date','agg').show(truncate=False)
+-----------+------+------------+-----------+---+
|create_date|agent |action_taken|action_date|agg|
+-----------+------+------------+-----------+---+
|2021-10-12 |agent1|1 |2021-10-16 |0 |
|2021-10-16 |agent1|1 |2021-10-19 |0 |
|2021-10-19 |agent1|1 |2021-10-19 |1 |
|2021-10-24 |agent1|1 |2021-10-25 |3 |
|2021-10-12 |agent2|1 |2021-12-12 |0 |
|2021-10-23 |agent2|1 |2021-10-23 |0 |
|2021-11-10 |agent2|1 |2021-11-19 |1 |
|2021-11-11 |agent2|0 |null |1 |
|2021-11-12 |agent2|1 |2021-11-12 |1 |
|2021-11-24 |agent2|1 |2021-12-14 |2 |
+-----------+------+------------+-----------+---+
Upvotes: 2