ilya
ilya

Reputation: 301

Join one of many threads in Python

I have a python program with one main thread and let's say 2 other threads (or maybe even more, probably doesn't matter). I would like to let the main thread sleep until ONE of the other threads is finished. It's easy to do with polling (by calling t.join(1) and waiting for one second for every thread t).

Is it possible to do it without polling, just by

SOMETHING_LIKE_JOIN(1, [t1, t2])

where t1 and t2 are threading.Thread objects? The call must do the following: sleep 1 second, but wake up as soon as one of t1,t2 is finished. Quite similar to POSIX select(2) call with two file descriptors.

Upvotes: 3

Views: 730

Answers (3)

Ma Tingchen
Ma Tingchen

Reputation: 171

Here is an example of using condition object.

from threading import Thread, Condition, Lock
from time import sleep
from random import random


_lock = Lock()


def run(idx, condition):
    sleep(random() * 3)
    print('thread_%d is waiting for notifying main thread.' % idx)
    _lock.acquire()
    with condition:
        print('thread_%d notifies main thread.' % idx)
        condition.notify()


def is_working(thread_list):
    for t in thread_list:
        if t.is_alive():
            return True
    return False


def main():
    condition = Condition(Lock())
    thread_list = [Thread(target=run, kwargs={'idx': i, 'condition': condition}) for i in range(10)]

    with condition:
        with _lock:
            for t in thread_list:
                t.start()

            while is_working(thread_list):
                _lock.release()
                if condition.wait(timeout=1):
                    print('do something')
                    sleep(1)  # <-- Main thread is doing something.
                else:
                    print('timeout')

    for t in thread_list:
        t.join()


if __name__ == '__main__':
    main()

I don't think there is race condition as you described in comment. The condition object contains a Lock. When the main thread is working(sleep(1) in the example), it holds the lock and no thread can notify it until it finishes its work and release the lock.


I just realize that there is a race condition in the previous example. I added a global _lock to ensure the condition never notifies the main thread until the main thread starts waiting. I don't like how it works, but I haven't figured out a better solution...

Upvotes: 1

ShadowRanger
ShadowRanger

Reputation: 155363

One solution is to use a multiprocessing.dummy.Pool; multiprocessing.dummy provides an API almost identical to multiprocessing, but backed by threads, so it gets you a thread pool for free.

For example, you can do:

from multiprocessing.dummy import Pool as ThreadPool

pool = ThreadPool(2)  # Two workers
for res in pool.imap_unordered(some_func, list_of_func_args):
    # res is whatever some_func returned

multiprocessing.Pool.imap_unordered returns results as they become available, regardless of which task finishes first.

If you can use Python 3.2 or higher (or install the concurrent.futures PyPI module for older Python) you can generalize to disparate task functions by creating one or more Futures from a ThreadPoolExecutor, then using concurrent.futures.wait with return_when=FIRST_COMPLETED, or using concurrent.futures.as_completed for similar effect.

Upvotes: 2

Rotsechs96
Rotsechs96

Reputation: 11

You can create a Thread Class and the main thread keeps a reference to it. So you can check whether the thread has finished and make your main thread continue again easily.

If that doesn't helped you, I suggest you to look at the Queue library!

import threading
import time, random


#THREAD CLASS#
class Thread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

        self.daemon = True
        self.state = False

        #START THREAD (THE RUN METHODE)#
        self.start()

    #THAT IS WHAT THE THREAD ACTUALLY DOES#
    def run(self):

        #THREAD SLEEPS FOR A RANDOM TIME RANGE# 
        time.sleep(random.randrange(5, 10))

        #AFTERWARDS IS HAS FINISHED (STORE IN VARIABLE)#
        self.state = True


    #RETURNS THE STATE#
    def getState(self):

        return self.state


#10 SEPERATE THREADS#
threads = []

for i in range(10):
    threads.append(Thread())

#MAIN THREAD#
while True:

    #RUN THROUGH ALL THREADS AND CHECK FOR ITS STATE#
    for i in range(len(threads)):
        if threads[i].getState():
            print "WAITING IS OVER: THREAD ", i 

    #SLEEPS ONE SECOND#
    time.sleep(1)

Upvotes: -1

Related Questions