Rook
Rook

Reputation: 21

Python Deque - 10 minutes worth of data

I'm trying to write a script that, when executed, appends a new available piece of information and removes data that's over 10 minutes old.

I'm wondering whats the the most efficient way, performance wise, of keeping track of the specific time to each information element while also removing the data over 10 minutes old.

My novice thought would be to append the information with a time stamp - [info, time] - to the deque and in a while loop continuously evaluate the end of the deque to remove anything older than 10 minutes... I doubt this is the best way.

Can someone provide an example? Thanks.

Upvotes: 2

Views: 1109

Answers (4)

Callam Delaney
Callam Delaney

Reputation: 660

I wrote a class for this based on Deque

import collections
from dataclasses import dataclass
import typing
import time
import logging

log = logging.getLogger(__name__)


@dataclass
class DequeItem:
    time: float
    value: typing.Any


class TimeDeque:
    """
    A timedeque is a deque which will keep records for the specified number of seconds.
    """

    def __init__(
        self, time_window_seconds: int = 600, get_time_fn: typing.Callable = time.time
    ):
        self.time_window_seconds = time_window_seconds
        self.get_time_fn = get_time_fn
        self._deque = collections.deque()

    def prune(self) -> bool:
        """
        Remove any items from the left of the deque which are older than
        `self.time_window_seconds`. Can probably be called less and save
        some time if we delegate use of this to the programmer.
        """

        try:
            while self.get_time_fn() - self._deque[-1].time > self.time_window_seconds:
                self._deque.popleft()
        except IndexError as e:
            """
            If self._deque[-1] fails, the deque is empty
            """
            return True

        log.debug("Pruned %d items from deque", len(self._deque))
        return True

    def append(self, item) -> bool:
        time = self.get_time_fn()
        self._deque.append(DequeItem(time, item))
        assert self.prune()
        return True

    def __len__(self) -> int:
        self.prune()
        return len(self._deque)

    def __iter__(self):
        self.prune()
        for ob in self._deque:
            yield ob.value

You can use it like this:


# create a timedeque for 10 minutes worth of data
td = timedeque.TimeDeque(600)

# append data to your queue
td.append(1)
td.append(2)
td.append(3)

# perform some operation on the deque

agg = sum(td)
# agg = 6

Upvotes: 1

abarnert
abarnert

Reputation: 366203

Yet another answer, with even more constraints:

If you can bucket things with, e.g., one-minute precision, all you need is 10 lists.

Unlike my other constrained answer, this doesn't require that you only ever append on the right; you can append in the middle (although you'll come after any other values for the same minute).

The down side is that you can't actually remove everything more than 10 minutes old; you can only remove everything in the 10th bucket, which could be off by up to 1 minute. You can choose what this means by choosing how to round:

  • Truncate one way, and nothing ever gets dropped too early, but everything is dropped late, an average of 30 seconds and at worst 60.
  • Truncate the other way, and nothing ever gets dropped late, but everything is dropped early, an average of 30 seconds and at worst 60.
  • Round at half, and things get dropped both early and late, but with an average of 0 seconds and a worst case of 30.

And you can of course use smaller buckets, like 100 buckets of 6-second intervals instead of 10 buckets of 1-minute intervals, to cut the error down as far as you like. Push that too far and you'll ruin the efficiency; a list of 600000 buckets of 1ms intervals is nearly as slow as a list of 1M entries.* But if you need 1 second or even 50ms, that's probably fine.

Here's a simple example:

def prune_queue(self):
    now = time.time() // 60
    age = now - self.last_bucket_time
    if age:
        self.buckets = self.buckets[-(10-age):] + [[] for _ in range(age)]
        self.last_bucket_time = now

def enqueue(self, thing):
    self.prune_queue()
    self.buckets[-1].append(thing)

* Of course you could combine this with the logarithmic data structure—a red-black tree of 600000 buckets is fine.

Upvotes: 1

abarnert
abarnert

Reputation: 366203

If you're only ever appending to the end, and the values are always inherently in sorted order, you don't actually need a logarithmic data structure like a tree or heap at all; you can do a logarithmic search within any sorted random-access structure like a list or collections.deque.

The problem is that deleting everything up to an arbitrary point in a list or deque takes O(N) time. There's no reason that it should; you should be able to drop N elements off a deque in amortized constant time (with del q[:pos] or q.popleft(pos)), it's just that collections.deque doesn't do that. If you find or write a deque class that does have that feature, you could just write this:

q = deque()
now = datetime.datetime.now()
q.append((now, 'a'))
q.append((now - datetime.timedelta(seconds=5), 'b')
q.append((now - datetime.timedelta(seconds=10), 'c')
q.append((now - datetime.timedelta(seconds=15), 'd')

now = datetime.datetime.now()
pos = bisect.bisect_left(q, now - datetime.timedelta(seconds=10))
del q[:pos]

I'm not sure whether a deque like this exists on PyPI, but the C source to collections.deque is available to fork, or the Python source from PyPy, or you could wrap a C or C++ deque type, or write one from scratch…


Or, if you're expecting that the "current" values in the deque will always be a small subset of the total length, you can do it in O(M) time just by not using the deque destructively:

q = q[pos:]

In fact, in that case, you might as well just use a list; it has O(1) append on the right, and slicing the last M items off a list is about as low-overhead a way to copy M items as you're going to find.

Upvotes: 2

abarnert
abarnert

Reputation: 366203

One way to do this is to use a sorted tree structure, keyed on the timestamps. Then you can find the first element >= 10 minutes ago, and remove everything before that.

Using the bintrees library as an example (because its key-slicing syntax makes this very easy to read and write…):

q = bintrees.FastRBTree.Tree()
now = datetime.datetime.now()
q[now] = 'a'
q[now - datetime.timedelta(seconds=5)] = 'b'
q[now - datetime.timedelta(seconds=10)] = 'c'
q[now - datetime.timedelta(seconds=15)] = 'd'

now = datetime.datetime.now()
del q[:now - datetime.timedelta(seconds=10)]

That will remove everything up to, but not including, now-10s, which should be both c and d.

This way, finding the first element to remove takes log N time, and removing N elements below that should be average case amortized log N but worst case N. So, your overall worst case time complexity doesn't improve, but your average case does.

Of course the overhead of managing a tree instead of a deque is pretty high, and could easily be higher than the savings of N/log N steps if you're dealing with a pretty small queue.


There are other logarithmic data structures that map be more appropriate, like a pqueue/heapqueue (as implemented by heapq in the stdlib), or a clock ring; I just chose a red-black tree because (with a PyPI module) it was the easiest one to demonstrate.

Upvotes: 4

Related Questions