Roger Dahl
Roger Dahl

Reputation: 15734

Efficient FIFO queue for arbitrarily sized chunks of bytes in Python

How do I implement a FIFO buffer to which I can efficiently add arbitrarily sized chunks of bytes to the head and from which I can efficiently pop arbitrarily sized chunks of bytes from the tail?

Background:

I have a class that reads bytes from file-like objects in chunks of arbitrary size and is itself a file-like object from which clients can read the bytes in chunks of arbitrary size.

The way I have this implemented is that whenever a client wants to read a chunk of bytes, the class will repeatedly read from the underlying file-like objects (with chunk sizes appropriate to those objects) and add the bytes to the head of a FIFO queue until there are enough bytes in the queue to serve a chunk of the requested size to the client. It then pops those bytes off of the tail of the queue and returns them to the client.

I have a performance problem that occurs when the chunk size for the underlying file-like objects is much larger than the chunk size that clients use when reading from the class.

Say the chunk size for the underlying file-like objects is 1 MiB and the chunk size the client reads with is 1 KiB. The first time the client requests 1 KiB, the class has to read 1 MiB and add it to the FIFO queue. Then, for that request and the subsequent 1023 requests, the class has to pop 1 KiB from the tail of the FIFO queue, which gradually decreases in size from 1 MiB to 0 bytes, at which time the cycle starts again.

I have currently implemented this with a StringIO object. Writing new bytes to the end of the StringIO object is fast, but removing bytes from the beginning is very slow, because a new StringIO object, that holds a copy of the entire previous buffer minus the first chunk of bytes, must be created.

SO questions that deal with similar issues tend to point to the deque container. However, deque is implemented as as doubly linked list. Writing a chunk to the deque would require splitting the chunk into objects, each containing a single byte. The deque would then add two pointers to each object for storing, probably increasing the memory requirements by at least an order of magnitude as compared to the bytes. Also, it would take a long time to traverse the linked list and deal with each object both to split chunks into objects and to join objects into chunks.

Upvotes: 25

Views: 12876

Answers (6)

Mateen Ulhaq
Mateen Ulhaq

Reputation: 27241

Benchmarks

...And the winner is @Cameron's answer.

FifoBufferedReaderWriter                 t=0.009697
FifoFileBuffer                           t=0.005137

Code:

from collections import deque
from io import BytesIO
from timeit import timeit


class FifoBufferedReaderWriter:
    """
    https://stackoverflow.com/a/78895090/365102
    """

    def __init__(self, chunk_size=1024):
        self.available = 0
        self.chunk_size = chunk_size
        self._queue = deque()
        self._write_buffer = BytesIO()

    def __len__(self):
        return self.available

    def write(self, data: bytes, *, flush=False):
        offset = 0
        buf_size = self._write_buffer.tell()

        if buf_size > 0:
            # Fill up the buffer.
            chunk = data[: self.chunk_size - buf_size]
            offset += len(chunk)
            self._write_buffer.write(chunk)

        if self._write_buffer.tell() == self.chunk_size:
            # Flush the buffer if it is full.
            self._flush_write_buffer()

        stop_offset = offset + (len(data) - offset) // self.chunk_size * self.chunk_size

        while offset < stop_offset:
            # Write data in chunks of chunk_size.
            chunk = data[offset : offset + self.chunk_size]
            offset += len(chunk)
            self._queue_append(chunk)

        # Write any remaining data.
        chunk = data[offset:]

        if len(chunk) == 0:
            return

        if flush:
            self._queue_append(chunk)
        else:
            self._write_buffer.write(chunk)

    def flush(self):
        self._flush_write_buffer()

    def read(self, size: int):
        with BytesIO() as buffer:
            self._read_into_buffer(size, buffer)
            return buffer.getvalue()

    def _queue_append(self, chunk):
        self._queue.append(chunk)
        self.available += len(chunk)

    def _queue_appendleft(self, chunk):
        self._queue.appendleft(chunk)
        self.available += len(chunk)

    def _queue_popleft(self):
        chunk = self._queue.popleft()
        self.available -= len(chunk)
        return chunk

    def _flush_write_buffer(self):
        chunk = self._write_buffer.getvalue()
        self._write_buffer.seek(0)
        self._write_buffer.truncate()
        self._queue_append(chunk)

    def _read_into_buffer(self, size, buffer):
        buffer_size = 0

        # Assemble chunks into output buffer.
        while buffer_size < size:
            try:
                chunk = self._queue_popleft()
            except IndexError:  # Or queue.Empty, if using queue.Queue.
                raise RuntimeError("Not enough data to read.")
            needed = size - buffer_size

            if len(chunk) > needed:
                self._queue_appendleft(chunk[needed:])
                chunk = chunk[:needed]

            buffer.write(chunk)
            buffer_size += len(chunk)


class FifoFileBuffer(object):
    """
    https://stackoverflow.com/a/10917767/365102
    """

    def __init__(self):
        self.buf = BytesIO()
        self.available = 0  # Bytes available for reading
        self.size = 0
        self.write_fp = 0

    def read(self, size=None):
        """Reads size bytes from buffer"""
        if size is None or size > self.available:
            size = self.available
        size = max(size, 0)

        result = self.buf.read(size)
        self.available -= size

        if len(result) < size:
            self.buf.seek(0)
            result += self.buf.read(size - len(result))

        return result

    def write(self, data):
        """Appends data to buffer"""
        if self.size < self.available + len(data):
            # Expand buffer
            new_buf = BytesIO()
            new_buf.write(self.read())
            self.write_fp = self.available = new_buf.tell()
            read_fp = 0
            while self.size <= self.available + len(data):
                self.size = max(self.size, 1024) * 2
            new_buf.write(b"0" * (self.size - self.write_fp))
            self.buf = new_buf
        else:
            read_fp = self.buf.tell()

        self.buf.seek(self.write_fp)
        written = self.size - self.write_fp
        self.buf.write(data[:written])
        self.write_fp += len(data)
        self.available += len(data)
        if written < len(data):
            self.write_fp -= self.size
            self.buf.seek(0)
            self.buf.write(data[written:])
        self.buf.seek(read_fp)

    def flush(self):
        pass


def test(stream):
    k1 = 100
    k2 = 100
    write_size = 1024
    read_size = 512
    for _ in range(k1):
        for _ in range(k2):
            stream.write(b"-" * write_size)
        stream.flush()
        for _ in range(k2):
            stream.read(write_size - read_size)
    stream.read(stream.available)


def main():
    n = 10

    funcs = [
        FifoBufferedReaderWriter,
        FifoFileBuffer,
    ]

    for func in funcs:
        stream = func()
        t = timeit("test(stream)", number=n, globals={"test": test, "stream": stream})
        print(f"{func.__name__:<40} t={t / n:.6f}")


if __name__ == "__main__":
    main()

Upvotes: 1

Mateen Ulhaq
Mateen Ulhaq

Reputation: 27241

Below is a simple implementation of a buffered FIFO for byte streams. It assembles chunks of chunk_size into a deque. This can be converted to a thread-safe version by replacing it with self._queue = queue.Queue() and locking writes to self.available.

class FifoBufferedReaderWriter:
    def __init__(self, chunk_size=1024):
        self.available = 0
        self.chunk_size = chunk_size
        self._queue = deque()
        self._write_buffer = BytesIO()
        self._read_chunks = deque()
        self._read_size = 0

    def __len__(self):
        return self.available

    def write(self, data: bytes, *, flush=False):
        offset = 0
        buf_size = self._write_buffer.tell()

        if buf_size > 0:
            # Fill up the buffer.
            chunk = data[: self.chunk_size - buf_size]
            offset += len(chunk)
            self._write_buffer.write(chunk)

        if self._write_buffer.tell() == self.chunk_size:
            # Flush the buffer if it is full.
            self._flush_write_buffer()

        stop_offset = offset + (len(data) - offset) // self.chunk_size * self.chunk_size

        while offset < stop_offset:
            # Write data in chunks of chunk_size.
            chunk = data[offset : offset + self.chunk_size]
            offset += len(chunk)
            self._queue_append(chunk)

        # Write any remaining data.
        chunk = data[offset:]

        if len(chunk) == 0:
            return

        if flush:
            self._queue_append(chunk)
        else:
            self._write_buffer.write(chunk)

    def flush(self):
        self._flush_write_buffer()

    def read(self, size: int):
        with BytesIO() as buffer:
            self._read_from_queue(size)
            self._read_into_buffer(size, buffer)
            return buffer.getvalue()

    def _queue_append(self, chunk):
        self._queue.append(chunk)
        self.available += len(chunk)

    def _queue_popleft(self):
        chunk = self._queue.popleft()
        self.available -= len(chunk)
        return chunk

    def _flush_write_buffer(self):
        chunk = self._write_buffer.getvalue()
        self._write_buffer.seek(0)
        self._write_buffer.truncate()
        self._queue_append(chunk)

    def _read_from_queue(self, size):
        """Pull data from the queue until we have enough to read."""
        while self._read_size < size:
            try:
                chunk = self._queue_popleft()
            except IndexError:  # Or queue.Empty, if using queue.Queue.
                raise RuntimeError("Not enough data to read.")
            self._read_chunks.append(chunk)
            self._read_size += len(chunk)

    def _read_into_buffer(self, size, buffer):
        buffer_size = 0

        # Assemble chunks into output buffer.
        while buffer_size < size:
            chunk = self._read_chunks.popleft()
            self._read_size -= len(chunk)
            needed = size - buffer_size

            if len(chunk) > needed:
                self._read_chunks.appendleft(chunk[needed:])
                self._read_size += len(chunk) - needed
                chunk = chunk[:needed]

            buffer.write(chunk)
            buffer_size += len(chunk)

Upvotes: 0

Eddy Xiao
Eddy Xiao

Reputation: 313

... but removing bytes from the beginning is very slow, because a new StringIO object, that holds a copy of the entire previous buffer minus the first chunk of bytes, must be created.

This type of slowness can be overcome by using bytearray in Python>=v3.4. See discussion in this issue and the patch is here.

The key is: removing head byte(s) from bytearray by

a[:1] = b''   # O(1) (amortized)

is much faster than

a = a[1:]     # O(len(a))

when len(a) is huge (say 10**6).

The bytearray also provides you a convenient way to preview the whole data set as an array (i.e. itself), in contrast to deque container which needs to join objects into a chunk.

Now an efficient FIFO can be implemented as follow

class byteFIFO:
    """ byte FIFO buffer """
    def __init__(self):
        self._buf = bytearray()

    def put(self, data):
        self._buf.extend(data)

    def get(self, size):
        data = self._buf[:size]
        # The fast delete syntax
        self._buf[:size] = b''
        return data

    def peek(self, size):
        return self._buf[:size]

    def getvalue(self):
        # peek with no copy
        return self._buf

    def __len__(self):
        return len(self._buf)

Benchmark

import time
bfifo = byteFIFO()
bfifo.put(b'a'*1000000)        # a very long array
t0 = time.time()
for k in range(1000000):
    d = bfifo.get(4)           # "pop" from head
    bfifo.put(d)               # "push" in tail
print('t = ', time.time()-t0)  # t = 0.897 on my machine

The circular/ring buffer implementation in Cameron's answer needs 2.378 sec, and his/her original implementation needs 1.108 sec.

Upvotes: 15

vartec
vartec

Reputation: 134641

I have currently implemented this with a StringIO object. Writing new bytes to the end of the StringIO object is fast, but removing bytes from the beginning is very slow, because a new StringIO object, that holds a copy of the entire previous buffer minus the first chunk of bytes, must be created.

Actually the most typical way of implementing FIFO is two use wrap around buffer with two pointers as such:

enter image description here image source

Now, you can implement that with StringIO() using .seek() to read/write from appropriate location.

Upvotes: 18

Cameron
Cameron

Reputation: 98826

Update: Here's an implementation of the circular buffer technique from vartec's answer (building on my original answer, preserved below for those curious):

from cStringIO import StringIO

class FifoFileBuffer(object):
    def __init__(self):
        self.buf = StringIO()
        self.available = 0    # Bytes available for reading
        self.size = 0
        self.write_fp = 0

    def read(self, size = None):
        """Reads size bytes from buffer"""
        if size is None or size > self.available:
            size = self.available
        size = max(size, 0)

        result = self.buf.read(size)
        self.available -= size

        if len(result) < size:
            self.buf.seek(0)
            result += self.buf.read(size - len(result))

        return result


    def write(self, data):
        """Appends data to buffer"""
        if self.size < self.available + len(data):
            # Expand buffer
            new_buf = StringIO()
            new_buf.write(self.read())
            self.write_fp = self.available = new_buf.tell()
            read_fp = 0
            while self.size <= self.available + len(data):
                self.size = max(self.size, 1024) * 2
            new_buf.write('0' * (self.size - self.write_fp))
            self.buf = new_buf
        else:
            read_fp = self.buf.tell()

        self.buf.seek(self.write_fp)
        written = self.size - self.write_fp
        self.buf.write(data[:written])
        self.write_fp += len(data)
        self.available += len(data)
        if written < len(data):
            self.write_fp -= self.size
            self.buf.seek(0)
            self.buf.write(data[written:])
        self.buf.seek(read_fp)

Original answer (superseded by the one above):

You can use a buffer and track the start index (read file pointer), occasionally compacting it when it gets too large (this should yield pretty good amortized performance).

For example, wrap a StringIO object like so:

from cStringIO import StringIO
class FifoBuffer(object):
    def __init__(self):
        self.buf = StringIO()

    def read(self, *args, **kwargs):
        """Reads data from buffer"""
        self.buf.read(*args, **kwargs)

    def write(self, *args, **kwargs):
        """Appends data to buffer"""
        current_read_fp = self.buf.tell()
        if current_read_fp > 10 * 1024 * 1024:
            # Buffer is holding 10MB of used data, time to compact
            new_buf = StringIO()
            new_buf.write(self.buf.read())
            self.buf = new_buf
            current_read_fp = 0

        self.buf.seek(0, 2)    # Seek to end
        self.buf.write(*args, **kwargs)

        self.buf.seek(current_read_fp)

Upvotes: 13

AKX
AKX

Reputation: 169298

Can you assume anything about the expected read/write amounts?

Chunking the data into, for example, 1024 byte fragments and using deque[1] might then work better; you could just read N full chunks, then one last chunk to split and put the remainder back on the start of the queue.

1) collections.deque

class collections.deque([iterable[, maxlen]])

Returns a new deque object initialized left-to-right (using append()) with data from iterable. If iterable is not specified, the new deque is empty.

Deques are a generalization of stacks and queues (the name is pronounced “deck” and is short for “double-ended queue”). Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction. ...

Upvotes: 5

Related Questions