Reputation: 2634
I have a twisted reactor listening for incoming data. I have a second reactor performing http requests in certain time intervals sending the results to the first reactor. Both run fine.
Now I would like to bring it together to run in one reactor, but I do not know how to achieve this. Something like - perform http requests every 60 sec. in an asynchrounous way from within the first listening "main" reactor.
What I have at the moment is:
# main reactor listening for incoming data forever
...
reactor.listenTCP(8123, TCPEventReceiverFactory())
The http reactor uses twisted.internet.defer.DeferredSemaphore()
to perform several http checks:
# create semaphore to manage the deferreds
semaphore = twisted.internet.defer.DeferredSemaphore(2)
# create a list with all urls to check
dl = list()
# append deferreds to list
for url in self._urls:
# returns deferred
dl.append(semaphore.run(self._getPage, url))
# get a DefferedList
dl = twisted.internet.defer.DeferredList(dl)
# add some callbacks for error handling
dl.addCallbacks(lambda x: reactor.stop(), self._handleError)
# start the reactor
reactor.run()
How can I add the timed http checks to the "main" reactor, so that they get performed in an asynchronous way? How does DeferredSemaphore
exactly work?
Can anyone help me with this?
[This is a kind of lightweight monitoring system processing http checkresults. I am new to Twisted and asynchronous programming. I am on Xubuntu 12.04 running Python 2.7]
Upvotes: 4
Views: 4258
Reputation: 156308
you don't need multiple reactors. Just perform all of the different actions using the same reactor.
if you're calling reactor.stop()
, you're likely doing something wrong, so lets get rid of that, and tie it all up into a single function (that we could use as a callback); since it is doing asynchronous work, it should also return a deferred, we'll use the DeferredList
you're already using.
def thing_that_does_http():
# create semaphore to manage the deferreds
semaphore = twisted.internet.defer.DeferredSemaphore(2)
# create a list with all urls to check
dl = DeferredList()
# append deferreds to list
for url in self._urls:
# returns deferred
dl.append(semaphore.run(self._getPage, url))
# get a DefferedList
dl = twisted.internet.defer.DeferredList(dl)
# add some callbacks for error handling
dl.addErrback(self._handleError)
return dl
The natural way to "perform x in certain time intervals" is with looping call. With this callback function we don't need to do much
reactor.listenTCP(8123, TCPEventReceiverFactory())
loop_http = twisted.intertnet.task.LoopingCall(thing_that_does_http)
# run once per minute, starting now.
loop_http.start(60)
The reactor LoopingCall
and getPage
will use to for their own purposes is twisted.internet.reactor
, if you are using a different reactor, For instance if you are doing unit testing, you'll need to override that default.
In the case of LoopingCall
, it's quite simple, after construction, (but before calling its start()
method), set its clock
attribute:
from twisted.internet.task import Clock
fake_reactor = Clock()
loop_http.clock = fake_reactor
fake_reactor.advance(120) # move time forward two minutes...
Unfortunately, the situation with getPage()
is less nice. You cannot use any other reactor with that interface; You'll need to use the newer, shinier t.w.c.Agent
. In many ways Agent
is superior, but it's not quite as convenient when you just want the raw response body as a string.
Aside from requiring an explicit reactor passed to its constructor, it's more about fine grained control over the request/response cycle than the convenience provided by getPage. As such it's implemented mostly in terms of Producer
s and Protocol
s. In the case of the former, we can pass a convenience helper, FileBodyProducer
to send request bodies with minimal fuss; in the latter, we'll need a simple protocol to buffer all of the chunks of data until we've gotten all of it.
Here's a chunk of code that could replace getPage
, with roughly the same interface, but taking an instance of Agent
as first argument
from cStringIO import StringIO
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import ResponseDone
from twisted.web.client import FileBodyProducer
class GetPageProtocol(Protocol):
def __init__(self):
self.deferred = Deferred()
self.data = []
def dataReceived(self, data):
self.data.append(data)
def connectionLost(self, reason):
reason.trap(ResponseDone)
data = ''.join(self.data)
del self.data
self.deferred.callback(data)
def agentGetPage(agent, url,
method="GET",
headers=None,
postdata=None):
if postdata is not None:
bodyProducer = FileBodyProducer(StringIO(postdata))
else:
bodyProducer = None
def _getPageResponded(response):
if response.length != 0:
proto = GetPageProtocol()
response.deliverBody(proto)
return proto.deferred
else:
return None
d = agent.request(method, url, headers, bodyProducer)
d.addCallback(_getPageResponded)
return d
which, in a unit test, would look sort of like:
from twisted.test.proto_helpers import MemoryReactor
from twisted.web.client import Agent
fake_reactor = MemoryReactor()
agent = Agent(fake_reactor)
d = agentGetPage(agent, "http://example.com")
assert fake_reactor.tcpClients # or some such, exercise the code by manipulating the reactor
Edit: I initially wanted to skim over this to give ectomorph, less to be confused about; but then it's also a pretty good idea to drum in the proper handling of reactors early, and avoid needless pain later.
Upvotes: 6