Reputation: 24621
from twisted.internet import reactor from twisted.internet import threads from twisted.internet import defer import time def worker(arg): print 'Hello world' time.sleep(10) return 1 def run(): print 'Starting workers' l = [] for x in range(2): l.append(threads.deferToThread(worker, x)) return defer.DeferredList(l) def res(results): print results reactor.stop() d = run() d.addCallback(res) reactor.run()
How to stop workers by timeout ?
Upvotes: 6
Views: 5587
Reputation: 1072
We do it like this using a decorator. This method has the advantage that the deferred is cancelled when the timeout is reached. This should somehow become part of the Twisted library imho
from twisted.internet import defer, reactor
def timeout(secs):
"""Decorator to add timeout to Deferred calls"""
def wrap(func):
@defer.inlineCallbacks
def _timeout(*args, **kwargs):
raw_d = func(*args, **kwargs)
if not isinstance(raw_d, defer.Deferred):
defer.returnValue(raw_d)
timeout_d = defer.Deferred()
times_up = reactor.callLater(secs, timeout_d.callback, None)
try:
raw_result, timeout_result = yield defer.DeferredList(
[raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True,
consumeErrors=True)
except defer.FirstError as e: # Only raw_d should raise an exception
assert e.index == 0
times_up.cancel()
e.subFailure.raiseException()
else: # timeout
if timeout_d.called:
raw_d.cancel()
raise Exception("%s secs have expired" % secs)
# no timeout
times_up.cancel()
defer.returnValue(raw_result)
return _timeout
return wrap
Upvotes: 1
Reputation: 1855
While it may not be possible to interrupt the threads, the Deferred can be stopped via the cancel
function, which I think is available in Twisted 10.1.0 and later.
I've used the following class to make Deferreds that callback a particular function if the Deferred hasn't fired after some time. It might be useful for someone that has the same question as that posed in the subject of the OP.
EDIT: As suggested by the comments below, it's best not to inherit from defer.Deferred
. Therefore, I've changed the code to use a wrapper that achieves the same effect.
class DeferredWrapperWithTimeout(object):
'''
Holds a deferred that allows a specified function to be called-back
if the deferred does not fire before some specified timeout.
'''
def __init__(self, canceller=None):
self._def = defer.Deferred(canceller)
def _finish(self, r, t):
'''
Function to be called (internally) after the Deferred
has fired, in order to cancel the timeout.
'''
if ( (t!=None) and (t.active()) ):
t.cancel()
return r
def getDeferred(self):
return self._def
def addTimeoutCallback(self, reactr, timeout,
callUponTimeout, *args, **kw):
'''
The function 'callUponTimeout' (with optional args or keywords)
will be called after 'timeout' seconds, unless the Deferred fires.
'''
def timeoutCallback():
self._def.cancel()
callUponTimeout(*args, **kw)
toc = reactr.callLater(timeout, timeoutCallback)
return self._def.addCallback(self._finish, toc)
Example callback before timeout:
from twisted.internet import reactor
from DeferredWithTimeout import *
dw = DeferredWrapperWithTimeout()
d = dw.getDeferred()
def testCallback(x=None):
print "called"
def testTimeout(x=None):
print "timedout"
d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.callLater(2, d.callback, "cb")
reactor.run()
Prints "called" and nothing else.
Example timeout before callback:
from twisted.internet import reactor
from DeferredWithTimeout import *
dw = DeferredWrapperWithTimeout()
d = dw.getDeferred()
def testCallback(x=None):
print "called"
def testTimeout(x=None):
print "timedout"
d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.run()
Prints "timedout" after 20 seconds, and nothing else.
Upvotes: 3
Reputation: 5656
Well, my answer is not about threads, but as it was said, you can implement timeout functionality as a separate helper:
from twisted.internet import defer
def add_watchdog(deferred, timeout=0.05):
def callback(value):
if not watchdog.called:
watchdog.cancel()
return value
deferred.addBoth(callback)
from twisted.internet import reactor
watchdog = reactor.callLater(timeout, defer.timeout, deferred)
d = defer.Deferred()
add_watchdog(d)
Then you can trap defer.TimeoutError
in deferred's errback if you need.
Upvotes: 0
Reputation: 48315
Threads cannot be interrupted unless they cooperate with you. time.sleep(10)
is not going to cooperate, so I don't think you can interrupt this worker. If you have another kind of worker that has several discrete phases, or operates in a loop over some tasks, then you can do something like this:
def worker(stop, jobs):
for j in jobs:
if stop:
break
j.do()
stop = []
d = deferToThread(worker)
# This will make the list eval to true and break out of the loop.
stop.append(None)
This isn't Twisted specific, either. This is just how threads work in Python.
Upvotes: 5