Reputation: 25569
I want to read and process some data from an external service. I ask the service if there is any data, if something was returned I process it and ask again (so data can be processed immediately when it's available) and otherwise I wait for a notification that data is available. This can be written as an infinite loop:
def loop(self):
while True:
data = yield self.get_data_nonblocking()
if data is not None:
yield self.process_data(data)
else:
yield self.data_available
def on_data_available(self):
self.data_available.fire()
How can data_available
be implemented here? It could be a Deferred but a Deferred cannot be reset, only recreated. Are there better options?
Can this loop be integrated into the Twisted event loop? I can read and process data right in on_data_available
and write some code instead of the loop checking get_data_nonblocking
but I feel like then I'll need some locks to make sure data is processed in the same order it arrives (the code above enforces it because it's the only place where it's processed). Is this a good idea at all?
Upvotes: 1
Views: 1208
Reputation: 48345
Consider the case of a TCP connection. The receiver buffer for a TCP connection can either have data in it or not. You can get that data, or get nothing, without blocking by using the non-blocking socket API:
data = socket.recv(1024)
if data:
self.process_data(data)
You can wait for data to be available using select()
(or any of the basically equivalent APIs):
socket.setblocking(False)
while True:
data = socket.recv(1024)
if data:
self.process_data(data)
else:
select([socket], [], [])
Of these, only select()
is particularly Twisted-unfriendly (though the Twisted idiom is certainly not to make your own socket.recv
calls). You could replace the select
call with a Twisted-friendly version though (implement a Protocol
with a dataReceived
method that fires a Deferred
- sort of like your on_data_available
method - toss in some yields and make this whole thing an inlineCallbacks
generator).
But though that's one way you can get data from a TCP connection, that's not the API that Twisted encourages you to use to do so. Instead, the API is:
class SomeProtocol(Protocol):
def dataReceived(self, data):
# Your logic here
I don't see how your case is substantially different. What if, instead of the loop you wrote, you did something like this:
class YourDataProcessor(object):
def process_data(self, data):
# Your logic here
class SomeDataGetter(object):
def __init__(self, processor):
self.processor = processor
def on_available_data(self):
data = self.get_data_nonblocking()
if data is not None:
self.processor.process_data(data)
Now there are no Deferreds at all (except perhaps in whatever implements on_available_data
or get_data_nonblocking
but I can't see that code).
If you leave this roughly as-is, you are guaranteed of in-ordered execution because Twisted is single-threaded (except in a couple places that are very clearly marked) and in a single-threaded program, an earlier call to process_data
must complete before any later call to process_data
could be made (excepting, of course, the case where process_data
reentrantly invokes itself - but that's another story).
If you switch this back to using inlineCallbacks
(or any equivalent "coroutine" flavored drink mix) then you are probably introducing the possibility of out-of-order execution.
For example, if get_data_nonblocking
returns a Deferred
and you write something like this:
@inlineCallbacks
def on_available_data(self):
data = yield self.get_data_nonblocking()
if data is not None:
self.processor.process_data(data)
Then you have changed on_available_data
to say that a context switch is allowed when calling get_data_nonblocking
. In this case, depending on your implementation of get_data_nonblocking
and on_available_data
, it's entirely possible that:
on_available_data
is calledget_data_nonblocking
is called and returns a Deferred
on_available_data
tells execution to switch to another context (via yield
/ inlineCallbacks
)on_available_data
is called againget_data_nonblocking
is called again and returns a Deferred
(perhaps the same one! perhaps a new one! depends on how it's implement)on_available_data
tells execution to switch to another context (same reason)Deferred
returned by the second invocation of get_data_nonblocking
to fire.on_available_data
frameprocess_data
is called with whatever data the second get_data_nonblocking
call returnedprocess_data
is called again with whatever data the first get_data_nonblocking
call returnedNow perhaps you've processed data out of order - again, this depends on more details of other parts of your system.
If so, you can always re-impose order. There are a lot of different possible approaches to this. Twisted itself doesn't come with any APIs that are explicitly in support of this operation so the solution involves writing some new code. Here's one idea (untested) for an approach - a queue-like class that knows about object sequence numbers:
class SequencedQueue(object):
"""
A queue-like type which guarantees objects come out of the queue in the order
defined by a sequence number associated with the objects when they are put into
the queue.
Application code manages sequence number assignment so that sequence numbers don't
have to have the same order as `put` calls on this type.
"""
def __init__(self):
# The sequence number of the object that should be given out
# by the next call to `get`
self._next_sequence = 0
# The sequence number of the next result that needs to be provided.
self._next_result = 0
# A holding area for objects past _next_sequence
self._queue = {}
# A holding area
self._waiting =
def put(self, sequence, object):
"""
Put an object into the queue at a particular point in the sequence.
"""
if sequence < self._next_sequence:
# Programming error. The sequence number
# of the object being put has already been used.
raise ...
self._queue[sequence] = object
self._check_waiters()
def get(self):
"""
Get an object from the queue which has the next sequence number
following whatever was previously gotten.
"""
result = self._waiters[self._next_sequence] = Deferred()
self._next_sequence += 1
self._check_waiters()
return result
def _check_waiters(self):
"""
Find any Deferreds previously given out by get calls which can now be given
their results and give them to them.
"""
while True:
seq = self._next_result
if seq in self._queue and seq in self._waiting:
self._next_result += 1
# XXX Probably a re-entrancy bug here. If a callback calls back in to
# put then this loop might run recursively
self._waiting.pop(seq).callback(self._queue.pop(seq))
else:
break
The expected behavior (modulo any bugs I accidentally added) is something like:
q = SequencedQueue()
d1 = q.get()
d2 = q.get()
# Nothing in particular happens
q.put(1, "second result")
# d1 fires with "first result" and afterwards d2 fires with "second result"
q.put(0, "first result")
Using this, just make sure you assign sequence numbers in the order you want data dispatched rather than the order it actually shows up somewhere. For example:
@inlineCallbacks
def on_available_data(self):
sequence = self._process_order
data = yield self.get_data_nonblocking()
if data is not None:
self._process_order += 1
self.sequenced_queue.put(sequence, data)
Elsewhere, some code can consume the queue sort of like:
@inlineCallbacks
def queue_consumer(self):
while True:
yield self.process_data(yield self.sequenced_queue.get())
Upvotes: 3