Anakimi
Anakimi

Reputation: 75

Creating a rolling window aggregation by only including rows that satisfy condition

I have a dataframe with some events, along with the date (if) an action was taken, along with the agent that made the action, as shown below:

  create_date   agent  action_taken action_date
0  2021-10-12  agent1             1  2021-10-16
1  2021-10-16  agent1             1  2021-10-19
2  2021-10-19  agent1             1  2021-10-19
3  2021-10-24  agent1             1  2021-10-25
4  2021-10-12  agent2             1  2021-12-12
5  2021-10-23  agent2             1  2021-10-23
6  2021-11-10  agent2             1  2021-11-19
7  2021-11-11  agent2             0        None
8  2021-11-12  agent2             1  2021-11-12
9  2021-11-24  agent2             1  2021-12-14

Each row is an event. The create_date is the date on which an event was created. Each event is assigned to an agent, who may/may not take an action on it (denoted in the action_taken column). Only if the agent takes an action, the action_taken column is populated with a 1, and the action_date column is populated with the date when the action was taken (when the row/event is created, no action taken would be taken yet, which would populate 0 under action_taken and None under action_date) . The action_taken and action_date columns are updated retroactively (not at time of event creation). that information is not available . I would like the agg column in each row to be populated with the number of actions that had been taken by the agent assigned to the event, in the 30 days prior to the the create_date of each event.

The use case is that I want to create a training set to predict whether a given agent will perform an action based on historical trends. The prediction is to be generated at the time of event creation, so if an action (for any past event) is taken after the creation of the new event, this information would not have been available at the time when the event was created/prediction was made, and therefore should not be included in the aggregation/training set so as to avoid temporal leakage.

Essentially, I am looking for guidance on creating a rolling aggregation (count/sum) of the action_taken column looking back 30 days from the create_date, grouped on the agent column – however, if the action_date occurred after the create_date in a particular row, it should not be included in the aggregate. For instance, for an event created on 2021-10-23, the aggregate cannot include any actions that occurred after the preceding 30 days of the create_date, so in the above example, the action on row 5 (index 4) for agent2 would not be included because the action_date is 2021-12-12.

The expected solution would be:

  create_date   agent  action_taken action_date  agg
0  2021-10-12  agent1             1  2021-10-16    0
1  2021-10-16  agent1             1  2021-10-19    0
2  2021-10-19  agent1             1  2021-10-19    1
3  2021-10-24  agent1             1  2021-10-25    3
4  2021-10-12  agent2             1  2021-12-12    0
5  2021-10-23  agent2             1  2021-10-23    0
6  2021-11-10  agent2             1  2021-11-19    1
7  2021-11-11  agent2             0        None    1
8  2021-11-12  agent2             1  2021-11-12    1
9  2021-11-24  agent2             1  2021-12-14    2

To recreate the dataframe:

pd.DataFrame{'create_date': ['2021-10-12', '2021-10-16', '2021-10-19',
                        '2021-10-24', '2021-10-12', '2021-10-23',
                        '2021-11-10', '2021-11-11', '2021-11-12', '2021-11-24'
                        ],
        'agent':   ['agent1', 'agent1', 'agent1', 
                    'agent1', 'agent2', 'agent2', 
                    'agent2', 'agent2', 'agent2', 'agent2'
                    ],
        'action_taken':  [1, 1, 1, 
                    1, 1, 1, 
                    1, 0, 1, 1
                    ],

        'action_date': ['2021-10-16', '2021-10-19', '2021-10-19',
                        '2021-10-25', '2021-12-12', '2021-10-23',
                        '2021-11-19', None, '2021-11-12', '2021-12-14'
                        ],
}

Would prefer help using python/pandas, but pyspark solutions are also equally appreciated.

Thanks.

Upvotes: 1

Views: 87

Answers (1)

wwnde
wwnde

Reputation: 26676

w=Window.partitionBy('agent').orderBy('create_date')
df = (
  #Coerce dates to date
  df.withColumn('create_date', to_date('create_date')).withColumn('action_date', to_date('action_date'))
  #Create new column with a list of action_dates preceding the current row but include the current rows date
  .withColumn('recentaction', collect_list('action_date').over(w))
  #Compute the last 3o days
  .withColumn('last30days',date_sub(col("create_date"),30))
  #Find if the dates in the list are between (create_date-30 days) and the day before create_date  
  .withColumn('counter', expr("transform(recentaction, x -> cast(x BETWEEN last30days and date_sub(create_date,1) as integer))"))
  # sum up the outcome from above
 .withColumn('agg', expr("reduce(counter,0, (x,i)-> x+i)")))


                                                         
                                                         
df.select('create_date', 'agent', 'action_taken', 'action_date','agg').show(truncate=False)


+-----------+------+------------+-----------+---+
|create_date|agent |action_taken|action_date|agg|
+-----------+------+------------+-----------+---+
|2021-10-12 |agent1|1           |2021-10-16 |0  |
|2021-10-16 |agent1|1           |2021-10-19 |0  |
|2021-10-19 |agent1|1           |2021-10-19 |1  |
|2021-10-24 |agent1|1           |2021-10-25 |3  |
|2021-10-12 |agent2|1           |2021-12-12 |0  |
|2021-10-23 |agent2|1           |2021-10-23 |0  |
|2021-11-10 |agent2|1           |2021-11-19 |1  |
|2021-11-11 |agent2|0           |null       |1  |
|2021-11-12 |agent2|1           |2021-11-12 |1  |
|2021-11-24 |agent2|1           |2021-12-14 |2  |
+-----------+------+------------+-----------+---+

Upvotes: 2

Related Questions