kyp4
kyp4

Reputation: 152

A way to wait for currently running tasks to finish then stop in multiprocessing Pool

I have a large number of tasks (40,000 to be exact) that I am using a Pool to run in parallel. To maximize efficiency, I pass the list of all tasks at once to starmap and let them run.

I would like to have it so that if my program is broken using Ctrl+C then currently running tasks will be allowed to finish but new ones will not be started. I have figured out the signal handling part to handle the Ctrl+C breaking just fine using the recommended method and this works well (at least with Python 3.6.9 that I am using):

import os
import signal
import random as rand
import multiprocessing as mp

def init() :
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def child(a, b, c) :
    st = rand.randrange(5, 20+1)
    print("Worker thread", a+1, "sleep for", st, "...")
    os.system("sleep " + str(st))

pool = mp.Pool(initializer=init)

try :
    pool.starmap(child, [(i, 2*i, 3*i) for i in range(10)])
    pool.close()
    pool.join()
    print("True exit!")
except KeyboardInterrupt :
    pool.terminate()
    pool.join()
    print("Interupted exit!")

The problem is that Pool seems to have no function to let the currently running tasks complete and then stop. It only has terminate and close. In the example above I use terminate but this is not what I want as this immediately terminates all running tasks (whereas I want to let the currently running tasks run to completion). On the other hand, close simply prevents adding more tasks, but calling close then join will wait for all pending tasks to complete (40,000 of them in my real case) (whereas I only want currently running tasks to finish not all of them).

I could somehow gradually add my tasks one by one or in chunks so I could use close and join when interrupted, but this seems less efficient unless there is a way to add a new task as soon as one finishes manually (which I'm not seeing how to do from the Pool documentation). It really seems like my use case would be common and that Pool should have a function for this, but I have not seen this question asked anywhere (or maybe I'm just not searching for the right thing).

Does anyone know how to accomplish this easily?

Upvotes: 1

Views: 1272

Answers (1)

wwii
wwii

Reputation: 23753

I tried to do something similar with concurrent.futures - see the last code block in this answer: it attempts to throttle adding tasks to the pool and only adds new tasks as tasks complete. You could change the logic to fit your needs. Maybe keep the pending work items slightly greater than the number of workers so you don't starve the executor. something like:

import concurrent.futures
import random as rand
import time

def child(*args, n=0):
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    a,b,c = args
    st = rand.randrange(1, 5)
    time.sleep(st)
    x = f"Worker {n} thread {a+1} slept for {st} - args:{args}"
    return (n,x)


if __name__ == '__main__':
    nworkers = 5    # ncpus?
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        data = ((i, 2*i, 3*i) for i in range(100))
        for n,args in enumerate(data):
            try:
                # limit pending tasks
                while len(executor._pending_work_items) >= nworkers + 2:
                    # wait till one completes and get the result
                    futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
                    #print(futures)
                    results.extend(future.result() for future in futures.done)
                    print(f'{len(results)} results so far')
                    fs = list(futures.not_done)
                print(f'add a new task {n}')

                fs.append(executor.submit(child, *args,**{'n':n}))
            except KeyboardInterrupt as e:
                print('ctrl-c!!}',file=sys.stderr)
                # don't add anymore tasks
                break
        # get leftover results as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'{len(executor._pending_work_items)} tasks pending:')
            result = future.result()
            results.append(result)
    results.sort()
    # separate the results from the value used to sort
    for n,result in results:
        print(result)

Here is a way to get the results sorted in submission order without modifying the task. It uses a dictionary to relate each future to its submission order and uses it for the sort key.

# same imports
def child(*args):
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    a,b,c = args
    st = random.randrange(1, 5)
    time.sleep(st)
    x = f"Worker thread {a+1} slept for {st} - args:{args}"
    return x


if __name__ == '__main__':
    nworkers = 5    # ncpus?
    sort_dict = {}
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        data = ((i, 2*i, 3*i) for i in range(100))
        for n,args in enumerate(data):
            try:
                # limit pending tasks
                while len(executor._pending_work_items) >= nworkers + 2:
                    # wait till one completes and grab it
                    futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
                    results.extend(future for future in futures.done)
                    print(f'{len(results)} futures completed so far')
                    fs = list(futures.not_done)
                future = executor.submit(child, *args)
                fs.append(future)
                print(f'task {n} added - future:{future}')
                sort_dict[future] = n
            except KeyboardInterrupt as e:
                print('ctrl-c!!',file=sys.stderr)
                # don't add anymore tasks
                break
        # get leftover futures as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'{len(executor._pending_work_items)} tasks pending:')
            results.append(future)
    #sort the futures
    results.sort(key=lambda f: sort_dict[f])
    # get the results
    for future in results:
        print(future.result())

You could also just add an attribute to each future and sort on that (no need for the dictionary)

...
                future = executor.submit(child, *args)
                # add an attribute to the future that can be sorted on
                future.submitted = n
                fs.append(future)

...
    results.sort(key=lambda f: f.submitted)

Upvotes: 1

Related Questions