Ulrich Eckhardt
Ulrich Eckhardt

Reputation: 17424

select-based socket loop without polling

I'm having problems writing a thread that transfers data between IO buffers and a socket. I don't have any problems getting it to run, but not the way I want it to. Here's a sketch of the code:

s = socket(...) # some connection
in_buffer = b'' # consumed by other thread
out_buffer = b'' # produced by other thread
while True:
    (r, w, x) = select([s], [s], [s])
    if r:
        in_buffer += s.recv(RECV_LIMIT)
    if w:
        sent = s.send(out_buffer)
        out_buffer = out_buffer[sent:]
    if x:
        break

The problem with this is that it consumes a complete CPU when idle. The reason is that the socket is writable most of the time, in particular when idle. select() returns immediately, does nothing, calls select() again, does nothing etc. There's a simple fix, don't check for a writable socket when you don't have anything to write:

... # dito
while True:
    if out_buffer:
        (r, w, x) = select([s], [s], [s])
    else:
        (r, w, x) = select([s], [], [s])
    ... # dito

This works, but it has a different problem: When idle, this blocks on the select() infinitely. If I add something to the output buffer, I need to somehow wake up the thread from the accept() call, but how? For the record, my current workaround changes the evaluation slightly:

while True:
    (r, w, x) = select([s], [s], [s])
    if x:
        break
    elif r:
        in_buffer += s.recv(RECV_LIMIT)
    elif w:
        if out_buffer:
            sent = s.send(out_buffer)
            out_buffer = out_buffer[sent:]
        else:
            sleep(0.001)

In short, when there is really nothing to do, insert a delay. The millisecond is enough to not even consume 1% CPU. A similar approach would have been to use a timeout for the select() call and then re-check the presence of output data. Still, neither solution is good, since both effectively boil down to polling and polling sucks. So, how do I write an IO thread like this portably and without polling?

Note: One approach would be to add another filedescriptor on which I would create artificial traffic in order to wake up the thread from the blocking select() call. Here, the problem is that select() can only be used portably on sockets, not e.g. a pipe. Alternatively, on MS Windows, I could associate a win32 event with a socket's state changes and another event to wake up a thread (see WSAEventSelect), but I don't want to write this code on top of the nonportable WinSock API either.

Upvotes: 4

Views: 3288

Answers (1)

Dan Lecocq
Dan Lecocq

Reputation: 3493

It's a little unclear to me why you need to have this middleman in here working with select -- is this a constraint of your problem? It seems to me that if it is, then you have to treat the output buffer as a resource that needs to be ready for reading before you even tell select that you're interested in writing.

It seems like this would be much simplified if you switched your buffers with Queues of little strings that get passed along. That way, you can have two threads that interact with the socket:

# One Thread consuming the socket
while True:
    (r, w, x) = select([s], [], [s])
    if r:
        in_buffer.put(s.recv(RECV_LIMIT))
    if x:
        break

# And one Thread writing to the socket
while True:
    string = out_buffer.get()
    (r, w, x) = select([], [s], [s])
    if w:
        s.send(string)
    if x:
        break

That way the producing thread can safely signal that data's even ready to be written. That said, select is a really low-level interface (as is socket, for that matter), and I would consider using an abstraction that knows a few more bells and whistles. I'm partial to gevent, but of course it's geared towards IO-bound applications and may not be a good fit if you're CPU-bound. There the producers and consumers can efficiently interact with the socket directly, removing the need for this middleman:

import gevent
from gevent import socket, sleep

def producer(sock):
    # We'll spit out some bytes every so often
    while True:
        sock.send('Hello from the producer!')
        sleep(0.01)

def consumer(sock):
    # We'll read some in as long as we can
    buffer = ''
    while True:
        buffer += sock.recv(100)
        # If the buffer can be consumed, we'll consume it and reset
        if len(buffer) > 500:
            print 'Consuming buffer: %s' % buffer
            buffer = ''

def client(sock):
    # This will emulate a client that prints what it recieves, but always
    # sends the same message
    while True:
        sock.send('Hello from the client!')
        print sock.recv(100)

# Run this to get the server going
listener = socket.socket()
listener.bind(('127.0.0.1', 5001))
listener.listen(5)
(sock, addr) = listener.accept()
gevent.joinall([
    gevent.spawn(producer, sock),
    gevent.spawn(consumer, sock)
])

# Run this to get a client going
connector = socket.socket()
connector.connect(('127.0.0.1', 5001))
gevent.joinall([
    gevent.spawn(client, connector)
])

Upvotes: 1

Related Questions