user937284
user937284

Reputation: 2634

Twisted / perform asynchronous http requests

I have a twisted reactor listening for incoming data. I have a second reactor performing http requests in certain time intervals sending the results to the first reactor. Both run fine.

Now I would like to bring it together to run in one reactor, but I do not know how to achieve this. Something like - perform http requests every 60 sec. in an asynchrounous way from within the first listening "main" reactor.

What I have at the moment is:

# main reactor listening for incoming data forever
...
reactor.listenTCP(8123, TCPEventReceiverFactory())

The http reactor uses twisted.internet.defer.DeferredSemaphore() to perform several http checks:

# create semaphore to manage the deferreds
semaphore = twisted.internet.defer.DeferredSemaphore(2)

# create a list with all urls to check
dl = list()
# append deferreds to list
for url in self._urls:
    # returns deferred
    dl.append(semaphore.run(self._getPage, url))

# get a DefferedList
dl = twisted.internet.defer.DeferredList(dl)
# add some callbacks for error handling
dl.addCallbacks(lambda x: reactor.stop(), self._handleError)

# start the reactor    
reactor.run()

How can I add the timed http checks to the "main" reactor, so that they get performed in an asynchronous way? How does DeferredSemaphore exactly work?

Can anyone help me with this?

[This is a kind of lightweight monitoring system processing http checkresults. I am new to Twisted and asynchronous programming. I am on Xubuntu 12.04 running Python 2.7]

Upvotes: 4

Views: 4258

Answers (1)

SingleNegationElimination
SingleNegationElimination

Reputation: 156308

you don't need multiple reactors. Just perform all of the different actions using the same reactor.

if you're calling reactor.stop(), you're likely doing something wrong, so lets get rid of that, and tie it all up into a single function (that we could use as a callback); since it is doing asynchronous work, it should also return a deferred, we'll use the DeferredList you're already using.

def thing_that_does_http():
    # create semaphore to manage the deferreds
    semaphore = twisted.internet.defer.DeferredSemaphore(2)

    # create a list with all urls to check
    dl = DeferredList()
    # append deferreds to list
    for url in self._urls:
        # returns deferred
        dl.append(semaphore.run(self._getPage, url))

    # get a DefferedList
    dl = twisted.internet.defer.DeferredList(dl)
    # add some callbacks for error handling
    dl.addErrback(self._handleError)
    return dl

The natural way to "perform x in certain time intervals" is with looping call. With this callback function we don't need to do much

reactor.listenTCP(8123, TCPEventReceiverFactory())
loop_http = twisted.intertnet.task.LoopingCall(thing_that_does_http)
# run once per minute, starting now.
loop_http.start(60)

The reactor LoopingCall and getPage will use to for their own purposes is twisted.internet.reactor, if you are using a different reactor, For instance if you are doing unit testing, you'll need to override that default.

In the case of LoopingCall, it's quite simple, after construction, (but before calling its start() method), set its clock attribute:

from twisted.internet.task import Clock
fake_reactor = Clock()
loop_http.clock = fake_reactor
fake_reactor.advance(120)  # move time forward two minutes...

Unfortunately, the situation with getPage() is less nice. You cannot use any other reactor with that interface; You'll need to use the newer, shinier t.w.c.Agent. In many ways Agent is superior, but it's not quite as convenient when you just want the raw response body as a string.

Aside from requiring an explicit reactor passed to its constructor, it's more about fine grained control over the request/response cycle than the convenience provided by getPage. As such it's implemented mostly in terms of Producers and Protocols. In the case of the former, we can pass a convenience helper, FileBodyProducer to send request bodies with minimal fuss; in the latter, we'll need a simple protocol to buffer all of the chunks of data until we've gotten all of it.

Here's a chunk of code that could replace getPage, with roughly the same interface, but taking an instance of Agent as first argument

from cStringIO import StringIO
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import ResponseDone
from twisted.web.client import FileBodyProducer


class GetPageProtocol(Protocol):
    def __init__(self):
        self.deferred = Deferred()
        self.data = []

    def dataReceived(self, data):
        self.data.append(data)

    def connectionLost(self, reason):
        reason.trap(ResponseDone)
        data = ''.join(self.data)
        del self.data
        self.deferred.callback(data)


def agentGetPage(agent, url,
                 method="GET",
                 headers=None,
                 postdata=None):
    if postdata is not None:
        bodyProducer = FileBodyProducer(StringIO(postdata))
    else:
        bodyProducer = None

    def _getPageResponded(response):
        if response.length != 0:
            proto = GetPageProtocol()
            response.deliverBody(proto)
            return proto.deferred
        else:
            return None

    d = agent.request(method, url, headers, bodyProducer)
    d.addCallback(_getPageResponded)
    return d

which, in a unit test, would look sort of like:

from twisted.test.proto_helpers import MemoryReactor
from twisted.web.client import Agent
fake_reactor = MemoryReactor()
agent = Agent(fake_reactor)
d = agentGetPage(agent, "http://example.com")

assert fake_reactor.tcpClients  # or some such, exercise the code by manipulating the reactor

Edit: I initially wanted to skim over this to give ectomorph, less to be confused about; but then it's also a pretty good idea to drum in the proper handling of reactors early, and avoid needless pain later.

Upvotes: 6

Related Questions