Reputation: 4253
I have a thread that runs a non-blocking event loop and other threads can put strings on a buffer for the event loop to write to a socket.
I want to accumulate the strings in a buffer, so that multiple small strings can be sent using one call to send
.
Conceptually, the buffer needs to do 3 things.
I cosidered a few things.
StringIO
would be really suitable for writing strings to the end, but no way to free data after it's written to the socket.collections.deque
of bytes. Very memory inefficient.array.array
Easy to append strings. Copying slices for reading/dumping data.My current code looks like this, but I'm not super happy with the copying and locking.
from array import array
from threading import Condition
class SendBuffer(object):
def __init__(self, max_size):
self.mark = 0
self.buf = array('c')
self.max_size = max_size
self.full = Condition()
def __len__(self):
with self.full:
return len(self.buf) - self.mark
def write(self, data):
with self.full:
while len(self) >= self.max_size:
# wait until data is written
self.full.wait()
self.buf.fromstring(data)
def _peek(self):
return buffer(self.buf, self.mark)
def _written(self, n):
self.mark += n
self.full.notify_all()
if self.mark >= len(self.buf):
self.mark = 0
self.buf = array('c')
elif self.mark >= self.max_size:
self.buf = self.buf[self.mark:]
self.mark = 0
def to_sock(self, sock):
with self.full:
data = self._peek()
if data:
n = sock.send(data)
self._written(n)
Upvotes: 4
Views: 1582
Reputation: 10266
Your problem is that your buffers, like StringIO, are only efficient to append to. Instead of appending to the end and deleting from the front when you're done processing, do the following:
Let's consider some cases:
The reader outpaces the writer(s): every write is immediately followed by a read of the same size, and the buffers swap position. Every write immediately goes out as a single packet.
The readers and writers are exactly in sync, or close enough with some jitter: multiple small writes get accumulated into the write buffer until the reader is done, and then they get sent out in chunks as large as the network will take.
The writer(s) outpace the reader. The write buffer will fill up while the reader is busy processing the read buffer. The reader will still send out as large chunks as the network will take, but you'll need to cap the writers in some way (typically by setting a maximum buffer size) and moderate them to avoid eating up limitless amounts of memory.
Remember that buffers are only a way to prevent stalling due to jitter. They don't help against mismatched producer/consumer speeds. In reality, your buffers will either be constantly full or constantly empty.
(*) Clearing a StringIO object is apparently not trivial, Google around a bit. You may want to create a new object instead of clearing, but that might lead to a lot of garbage which will need to be GC'ed if you have lots of context switches. Instead, you might also consider building your own clearable buffer using a combination of an array and an index
variable, in which case clearing would come down to index = 0
.
Upvotes: 2