Bdfy
Bdfy

Reputation: 24621

How to add timeout to Deferred from Twisted's deferToThread API?

from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import defer
import time

def worker(arg):
    print 'Hello world'
     time.sleep(10)
    return 1

def run():
    print 'Starting workers'
    l = []
    for x in range(2):
        l.append(threads.deferToThread(worker, x))
    return defer.DeferredList(l)

def res(results):
    print results
    reactor.stop()

d = run()
d.addCallback(res)
reactor.run()

How to stop workers by timeout ?

Upvotes: 6

Views: 5587

Answers (4)

Carl D'Halluin
Carl D'Halluin

Reputation: 1072

We do it like this using a decorator. This method has the advantage that the deferred is cancelled when the timeout is reached. This should somehow become part of the Twisted library imho

from twisted.internet import defer, reactor

def timeout(secs):
    """Decorator to add timeout to Deferred calls"""
    def wrap(func):
        @defer.inlineCallbacks
        def _timeout(*args, **kwargs):
            raw_d = func(*args, **kwargs)
            if not isinstance(raw_d, defer.Deferred):
                defer.returnValue(raw_d)

            timeout_d = defer.Deferred()
            times_up = reactor.callLater(secs, timeout_d.callback, None)

            try:
                raw_result, timeout_result = yield defer.DeferredList(
                    [raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True,
                    consumeErrors=True)
            except defer.FirstError as e:  # Only raw_d should raise an exception
                assert e.index == 0
                times_up.cancel()
                e.subFailure.raiseException()
            else:  # timeout
                if timeout_d.called:
                    raw_d.cancel()
                    raise Exception("%s secs have expired" % secs)

            # no timeout
            times_up.cancel()
            defer.returnValue(raw_result)
        return _timeout
return wrap

Upvotes: 1

Corey
Corey

Reputation: 1855

While it may not be possible to interrupt the threads, the Deferred can be stopped via the cancel function, which I think is available in Twisted 10.1.0 and later.

I've used the following class to make Deferreds that callback a particular function if the Deferred hasn't fired after some time. It might be useful for someone that has the same question as that posed in the subject of the OP.

EDIT: As suggested by the comments below, it's best not to inherit from defer.Deferred. Therefore, I've changed the code to use a wrapper that achieves the same effect.

class DeferredWrapperWithTimeout(object):
    '''
    Holds a deferred that allows a specified function to be called-back
    if the deferred does not fire before some specified timeout.
    '''
    def __init__(self, canceller=None):
        self._def = defer.Deferred(canceller)

    def _finish(self, r, t):
        '''
        Function to be called (internally) after the Deferred
        has fired, in order to cancel the timeout.
        '''
        if ( (t!=None) and (t.active()) ):
            t.cancel()
        return r

    def getDeferred(self):
        return self._def

    def addTimeoutCallback(self, reactr, timeout,
                           callUponTimeout, *args, **kw):
        '''
        The function 'callUponTimeout' (with optional args or keywords)
        will be called after 'timeout' seconds, unless the Deferred fires.
        '''

        def timeoutCallback():
            self._def.cancel()
            callUponTimeout(*args, **kw)
        toc = reactr.callLater(timeout, timeoutCallback)
        return self._def.addCallback(self._finish, toc)

Example callback before timeout:

from twisted.internet import reactor

from DeferredWithTimeout import *

dw = DeferredWrapperWithTimeout()
d  = dw.getDeferred()

def testCallback(x=None):
    print "called"

def testTimeout(x=None):
    print "timedout"

d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.callLater(2, d.callback, "cb")
reactor.run()

Prints "called" and nothing else.

Example timeout before callback:

from twisted.internet import reactor

from DeferredWithTimeout import *

dw = DeferredWrapperWithTimeout()
d  = dw.getDeferred()

def testCallback(x=None):
    print "called"

def testTimeout(x=None):
    print "timedout"

d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.run()

Prints "timedout" after 20 seconds, and nothing else.

Upvotes: 3

o3bvv
o3bvv

Reputation: 5656

Well, my answer is not about threads, but as it was said, you can implement timeout functionality as a separate helper:

from twisted.internet import defer

def add_watchdog(deferred, timeout=0.05):

    def callback(value):
        if not watchdog.called:
            watchdog.cancel()
        return value

    deferred.addBoth(callback)

    from twisted.internet import reactor
    watchdog = reactor.callLater(timeout, defer.timeout, deferred)

d = defer.Deferred()
add_watchdog(d)

Then you can trap defer.TimeoutError in deferred's errback if you need.

Upvotes: 0

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48315

Threads cannot be interrupted unless they cooperate with you. time.sleep(10) is not going to cooperate, so I don't think you can interrupt this worker. If you have another kind of worker that has several discrete phases, or operates in a loop over some tasks, then you can do something like this:

def worker(stop, jobs):
    for j in jobs:
        if stop:
            break
        j.do()

stop = []
d = deferToThread(worker)

# This will make the list eval to true and break out of the loop.
stop.append(None)

This isn't Twisted specific, either. This is just how threads work in Python.

Upvotes: 5

Related Questions