Edward Khachatryan
Edward Khachatryan

Reputation: 469

Efficient time bound queue in Python?

What would be an efficient approach to replicate pandas rolling window functionality on live streaming data?

Suppose we want to maintain a sum of last n observations, collections.deque with maxlen parameter is the way to go, but what if instead of a fixed n we need a sum of values from last m seconds?

Using pandas.Series would be inefficient since the underlying numpy arrays are immutable and thus unsuitable for handling live data streams - each append copies the whole array.

Something like cachetools.TTLCache is good for storing the live data, but inefficient for calculations - getting the sum would require to iterate over each element every time.

Currently my approach for maintaining a live data stream sum uses collections.deque, time-to-live parameter and a while loop for discarding old values:

import time
from collections import deque

class Accumulator:
    def __init__(self, ttl):
        self.ttl = ttl
        self.sum = 0
        self.q = deque()

    def append(self, value):
        self.q.append((time.time(), value))
        self.sum += value
        self.discard_old_values()

    def discard_old_values(self):
        cutoff_time = time.time() - self.ttl
        try:
            while self.q[0][0] < cutoff_time:
                self.sum -= self.q.popleft()[1]
        except IndexError:
            pass

    def get_sum(self):
        self.discard_old_values()
        return self.sum

So the questions are:

  1. Can we do better?
  2. Is there a way to get rid of the ugly while loop here?
  3. What's the common name for this type of "TTL queue" data structure?
  4. Is there a popular Python library that already implements it?
  5. Is there a way to utilize pandas rolling windows on mutable collections?

Upvotes: 1

Views: 1499

Answers (1)

Alistair Carscadden
Alistair Carscadden

Reputation: 1228

Here is my proposed improvement to discard_old_values:

def discard_old_values(self):
    die = time.time() - self.ttl
    while(len(self.q) > 0 and self.q[0][0] < die):
        self.sum -= self.q.popleft()[1]

I decided to, instead of call time.time() for each element that may be discarded, call it once and do the arithmetic to find the "die time" only once.

This saves the cost of throwing exceptions, and the cost of multiple calls to time.

Upvotes: 2

Related Questions