Charles
Charles

Reputation: 3774

Waiting on event with Twisted and PB

I have a python app that uses multiple threads and I am curious about the best way to wait for something in python without burning cpu or locking the GIL.

my app uses twisted and I spawn a thread to run a long operation so I do not stomp on the reactor thread. This long operation also spawns some threads using twisted's deferToThread to do something else, and the original thread wants to wait for the results from the defereds.

What I have been doing is this

while self._waiting:
    time.sleep( 0.01 )

which seemed to disrupt twisted PB's objects from receiving messages so I thought sleep was locking the GIL. Further investigation by the posters below revealed however that it does not.

There are better ways to wait on threads without blocking the reactor thread or python posted below.

Upvotes: 2

Views: 2899

Answers (5)

Glyph
Glyph

Reputation: 31910

If you're already using Twisted, you should never need to "wait" like this.

As you've described it:

I spawn a thread to run a long operation ... This long operation also spawns some threads using twisted's deferToThread ...

That implies that you're calling deferToThread from your "long operation" thread, not from your main thread (the one where reactor.run() is running). As Jean-Paul Calderone already noted in a comment, you can only call Twisted APIs (such as deferToThread) from the main reactor thread.

The lock-up that you're seeing is a common symptom of not following this rule. It has nothing to do with the GIL, and everything to do with the fact that you have put Twisted's reactor into a broken state.

Based on your loose description of your program, I've tried to write a sample program that does what you're talking about based entirely on Twisted APIs, spawning all threads via Twisted and controlling them all from the main reactor thread.

import time

from twisted.internet import reactor
from twisted.internet.defer import gatherResults
from twisted.internet.threads import deferToThread, blockingCallFromThread

def workReallyHard():
    "'Work' function, invoked in a thread."
    time.sleep(0.2)

def longOperation():
    for x in range(10):
        workReallyHard()
        blockingCallFromThread(reactor, startShortOperation, x)
    result = blockingCallFromThread(reactor, gatherResults, shortOperations)
    return 'hooray', result

def shortOperation(value):
    workReallyHard()
    return value * 100

shortOperations = []

def startShortOperation(value):
    def done(result):
        print 'Short operation complete!', result
        return result
    shortOperations.append(
        deferToThread(shortOperation, value).addCallback(done))

d = deferToThread(longOperation)
def allDone(result):
    print 'Long operation complete!', result
    reactor.stop()
d.addCallback(allDone)

reactor.run()

Note that at the point in allDone where the reactor is stopped, you could fire off another "long operation" and have it start the process all over again.

Upvotes: 13

ʇsәɹoɈ
ʇsәɹoɈ

Reputation: 23509

According to the Python source, time.sleep() does not hold the GIL.

http://code.python.org/hg/trunk/file/98e56689c59c/Modules/timemodule.c#l920

Note the use of Py_BEGIN_ALLOW_THREADS and Py_END_ALLOW_THREADS, as documented here:

http://docs.python.org/c-api/init.html#thread-state-and-the-global-interpreter-lock

Upvotes: 2

Alex Martelli
Alex Martelli

Reputation: 882751

I recently found out that calling time.sleep( X ) will lock the GIL for the entire time X and therefore freeze ALL python threads for that time period.

You found wrongly -- this is definitely not how it works. What's the source where you found this mis-information?

Anyway, then you clarify (in comments -- better edit your Q!) that you're using deferToThread and your problem with this is that...:

Well yes I defer the action to a thread and give twisted a callback. But the parent thread needs to wait for the whole series of sub threads to complete before it can move onto a new set of sub threads to spawn

So use as the callback a method of an object with a counter -- start it at 0, increment it by one every time you're deferring-to-thread and decrement it by one in the callback method.

When the callback method sees that the decremented counter has gone back to 0, it knows that we're done waiting "for the whole series of sub threads to complete" and then the time has come to "move on to a new set of sub threads to spawn", and thus, in that case only, calls the "spawn a new set of sub threads" function or method -- it's that easy!

E.g. (net of typos &c as this is untested code, just to give you the idea)...:

class Waiter(object):

  def __init__(self, what_next, *a, **k):
    self.counter = 0
    self.what_next = what_next
    self.a = a
    self.k = k

  def one_more(self):
    self.counter += 1

  def do_wait(self, *dont_care):
    self.counter -= 1
    if self.counter == 0:
    self.what_next(*self.a, **self.k)


def spawn_one_thread(waiter, long_calculation, *a, **k):
  waiter.one_more()
  d = threads.deferToThread(long_calculation, *a, **k)
  d.addCallback(waiter.do_wait)

def spawn_all(waiter, list_of_lists_of_functions_args_and_kwds):
  if not list_of_lists_of_functions_args_and_kwds:
    return
  if waiter is None:
    waiter=Waiter(spawn_all, list_of_lists_of_functions_args_and_kwds)
  this_time = list_of_list_of_functions_args_and_kwds.pop(0)
  for f, a, k in this_time:
    spawn_one_thread(waiter, f, *a, **k)

def start_it_all(list_of_lists_of_functions_args_and_kwds):
  spawn_all(None, list_of_lists_of_functions_args_and_kwds)

Upvotes: 5

Brian Clapper
Brian Clapper

Reputation: 26230

The threading module allows you to spawn a thread, which is then represented by a Thread object. That object has a join method that you can use to wait for the subthread to complete.

See http://docs.python.org/library/threading.html#module-threading

Upvotes: 1

Dirk
Dirk

Reputation: 31061

Have you tried condition variables? They are used like

condition = Condition()

def consumer_in_thread_A():
    condition.acquire()
    try:
        while resource_not_yet_available:
            condition.wait()
        # Here, the resource is available and may be 
        # consumed
    finally:
        condition.release()

def produce_in_thread_B():
    # ... create resource, whatsoever
    condition.acquire()
    try:
        condition.notify_all()
    finally:
        condition.release()

Condition variables act as locks (acquire and release), but their main purpose is to provide the control mechanism which allows to wait for them to be notify-d or notify_all-d.

Upvotes: 5

Related Questions