Reputation: 469
What would be an efficient approach to replicate pandas
rolling window functionality on live streaming data?
Suppose we want to maintain a sum of last n observations, collections.deque
with maxlen
parameter is the way to go, but what if instead of a fixed n we need a sum of values from last m seconds?
Using pandas.Series
would be inefficient since the underlying numpy
arrays are immutable and thus unsuitable for handling live data streams - each append copies the whole array.
Something like cachetools.TTLCache
is good for storing the live data, but inefficient for calculations - getting the sum would require to iterate over each element every time.
Currently my approach for maintaining a live data stream sum uses collections.deque
, time-to-live parameter and a while
loop for discarding old values:
import time
from collections import deque
class Accumulator:
def __init__(self, ttl):
self.ttl = ttl
self.sum = 0
self.q = deque()
def append(self, value):
self.q.append((time.time(), value))
self.sum += value
self.discard_old_values()
def discard_old_values(self):
cutoff_time = time.time() - self.ttl
try:
while self.q[0][0] < cutoff_time:
self.sum -= self.q.popleft()[1]
except IndexError:
pass
def get_sum(self):
self.discard_old_values()
return self.sum
So the questions are:
while
loop here?pandas
rolling windows on mutable collections?Upvotes: 1
Views: 1499
Reputation: 1228
Here is my proposed improvement to discard_old_values
:
def discard_old_values(self):
die = time.time() - self.ttl
while(len(self.q) > 0 and self.q[0][0] < die):
self.sum -= self.q.popleft()[1]
I decided to, instead of call time.time()
for each element that may be discarded, call it once and do the arithmetic to find the "die time" only once.
This saves the cost of throwing exceptions, and the cost of multiple calls to time.
Upvotes: 2