wRAR
wRAR

Reputation: 25569

Twisted wait for event in loop

I want to read and process some data from an external service. I ask the service if there is any data, if something was returned I process it and ask again (so data can be processed immediately when it's available) and otherwise I wait for a notification that data is available. This can be written as an infinite loop:

def loop(self):
    while True:
        data = yield self.get_data_nonblocking()
        if data is not None:
            yield self.process_data(data)
        else:
            yield self.data_available

def on_data_available(self):
    self.data_available.fire()

How can data_available be implemented here? It could be a Deferred but a Deferred cannot be reset, only recreated. Are there better options?

Can this loop be integrated into the Twisted event loop? I can read and process data right in on_data_available and write some code instead of the loop checking get_data_nonblocking but I feel like then I'll need some locks to make sure data is processed in the same order it arrives (the code above enforces it because it's the only place where it's processed). Is this a good idea at all?

Upvotes: 1

Views: 1208

Answers (1)

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48345

Consider the case of a TCP connection. The receiver buffer for a TCP connection can either have data in it or not. You can get that data, or get nothing, without blocking by using the non-blocking socket API:

data = socket.recv(1024)
if data:
    self.process_data(data)

You can wait for data to be available using select() (or any of the basically equivalent APIs):

socket.setblocking(False)
while True:
    data = socket.recv(1024)
    if data:
        self.process_data(data)
    else:
        select([socket], [], [])

Of these, only select() is particularly Twisted-unfriendly (though the Twisted idiom is certainly not to make your own socket.recv calls). You could replace the select call with a Twisted-friendly version though (implement a Protocol with a dataReceived method that fires a Deferred - sort of like your on_data_available method - toss in some yields and make this whole thing an inlineCallbacks generator).

But though that's one way you can get data from a TCP connection, that's not the API that Twisted encourages you to use to do so. Instead, the API is:

class SomeProtocol(Protocol):
    def dataReceived(self, data):
        # Your logic here

I don't see how your case is substantially different. What if, instead of the loop you wrote, you did something like this:

class YourDataProcessor(object):
    def process_data(self, data):
        # Your logic here

class SomeDataGetter(object):
    def __init__(self, processor):
        self.processor = processor

    def on_available_data(self):
        data = self.get_data_nonblocking()
        if data is not None:
            self.processor.process_data(data)

Now there are no Deferreds at all (except perhaps in whatever implements on_available_data or get_data_nonblocking but I can't see that code).

If you leave this roughly as-is, you are guaranteed of in-ordered execution because Twisted is single-threaded (except in a couple places that are very clearly marked) and in a single-threaded program, an earlier call to process_data must complete before any later call to process_data could be made (excepting, of course, the case where process_data reentrantly invokes itself - but that's another story).

If you switch this back to using inlineCallbacks (or any equivalent "coroutine" flavored drink mix) then you are probably introducing the possibility of out-of-order execution.

For example, if get_data_nonblocking returns a Deferred and you write something like this:

    @inlineCallbacks
    def on_available_data(self):
        data = yield self.get_data_nonblocking()
        if data is not None:
            self.processor.process_data(data)

Then you have changed on_available_data to say that a context switch is allowed when calling get_data_nonblocking. In this case, depending on your implementation of get_data_nonblocking and on_available_data, it's entirely possible that:

  1. on_available_data is called
  2. get_data_nonblocking is called and returns a Deferred
  3. on_available_data tells execution to switch to another context (via yield / inlineCallbacks)
  4. on_available_data is called again
  5. get_data_nonblocking is called again and returns a Deferred (perhaps the same one! perhaps a new one! depends on how it's implement)
  6. The second invocation of on_available_data tells execution to switch to another context (same reason)
  7. The reactor spins around for a while and eventually an event arrives that causes the Deferred returned by the second invocation of get_data_nonblocking to fire.
  8. Execution switches back to the second on_available_data frame
  9. process_data is called with whatever data the second get_data_nonblocking call returned
  10. Eventually the same things happen to the first set of objects and process_data is called again with whatever data the first get_data_nonblocking call returned

Now perhaps you've processed data out of order - again, this depends on more details of other parts of your system.

If so, you can always re-impose order. There are a lot of different possible approaches to this. Twisted itself doesn't come with any APIs that are explicitly in support of this operation so the solution involves writing some new code. Here's one idea (untested) for an approach - a queue-like class that knows about object sequence numbers:

class SequencedQueue(object):
    """
    A queue-like type which guarantees objects come out of the queue in the order
    defined by a sequence number associated with the objects when they are put into
    the queue.

    Application code manages sequence number assignment so that sequence numbers don't
    have to have the same order as `put` calls on this type.
    """
    def __init__(self):
        # The sequence number of the object that should be given out
        # by the next call to `get`
        self._next_sequence = 0

        # The sequence number of the next result that needs to be provided.
        self._next_result = 0

        # A holding area for objects past _next_sequence
        self._queue = {}

        # A holding area 
        self._waiting = 

    def put(self, sequence, object):
        """
        Put an object into the queue at a particular point in the sequence.
        """
        if sequence < self._next_sequence:
            # Programming error.  The sequence number
            # of the object being put has already been used.
            raise ...

        self._queue[sequence] = object
        self._check_waiters()

    def get(self):
        """
        Get an object from the queue which has the next sequence number
        following whatever was previously gotten.
        """
        result = self._waiters[self._next_sequence] = Deferred()
        self._next_sequence += 1
        self._check_waiters()
        return result

    def _check_waiters(self):
        """
        Find any Deferreds previously given out by get calls which can now be given
        their results and give them to them.
        """
        while True:
            seq = self._next_result
            if seq in self._queue and seq in self._waiting:
                self._next_result += 1
                # XXX Probably a re-entrancy bug here.  If a callback calls back in to
                # put then this loop might run recursively
                self._waiting.pop(seq).callback(self._queue.pop(seq))
            else:
                break

The expected behavior (modulo any bugs I accidentally added) is something like:

q = SequencedQueue()
d1 = q.get()
d2 = q.get()
# Nothing in particular happens
q.put(1, "second result")
# d1 fires with "first result" and afterwards d2 fires with "second result"
q.put(0, "first result")

Using this, just make sure you assign sequence numbers in the order you want data dispatched rather than the order it actually shows up somewhere. For example:

    @inlineCallbacks
    def on_available_data(self):
        sequence = self._process_order
        data = yield self.get_data_nonblocking()
        if data is not None:
            self._process_order += 1
            self.sequenced_queue.put(sequence, data)

Elsewhere, some code can consume the queue sort of like:

@inlineCallbacks
def queue_consumer(self):
    while True:
        yield self.process_data(yield self.sequenced_queue.get())

Upvotes: 3

Related Questions