rmeertens
rmeertens

Reputation: 4451

How to kill threads spawned when using the multiprocessing's Pool imap_unordered

I'm trying to speed up a simple Python program using multiprocessing's Pool. Specifically: the imap_unordered function.

In my case I'm searching for a specific object with specific properties, and checking this property takes a long time, hence the reason I want to spread the load over my CPU cores.

I created the following code:

from multiprocessing import Pool as ThreadPool 
pool = ThreadPool(4) 

some_iterator = (create_item() for _ in range(100000))

results = pool.imap_unordered(my_function, some_iterator)

for result in results:
  if is_favourable(result):
    break

Unfortunately, after calling break, there is still a lot of activity in the threads (as can be observed in my computers activity monitor). How should I keep searching for results till I find a favourable one, or how can I stop iterating over all items using the imap_unordered iterator?

Upvotes: 1

Views: 1869

Answers (2)

martineau
martineau

Reputation: 123501

For starters, your example code is not using a multiprocessing ThreadPool because your import statement is wrong (it's just allowing access to the regular Pool class via that name).

Regardless, you can just use the Pool/ThreadPool as a context manager since Python 3.3 and put the loop inside it. This will cause its terminate() method to be called automatically when the context is exited (due to the break statement in the example below), and it will will immediately stop the working processes.

from multiprocessing import current_process
from multiprocessing.pool import ThreadPool
from random import randint
import time

def create_item():
    return randint(0, 20)

def is_favourable(value):
    return value < 20

def my_function(value):
    print(current_process().name, value)
    time.sleep(2)
    return value * 2

if __name__ == '__main__':
    with ThreadPool(4) as pool:  # Use as context manager (Python 3.3+)
        some_iterator = (create_item() for _ in range(10000))
        start = time.time()
        results = pool.imap_unordered(my_function, some_iterator)
        for result in results:
            print('result:', result)
            if is_favourable(result):
                break  # Stop loop and exit Pool context.

    print('done')
    print(time.time() - start)

If you're using an older version of Python, you can just explicitly call pool.terminate() immediately before the break statement (and not use a with statement).

Upvotes: 3

Mark Tolonen
Mark Tolonen

Reputation: 177991

Pool.terminate() will immediately stop the working processes, while Pool.close() will stop submitting tasks and the processes will close once their current task is done.

Pool.terminate() will also be called if the Pool instance is garbage-collected, or by using it with with, so the following is a solution:

import multiprocessing as mp
import time

def my_function(item):
    print(mp.current_process().name,item)
    time.sleep(2) # imitate a long process
    return item * 2

def is_favourable(item):
    return item == 20   # something to look for (result of item 10)

def find():
    with mp.Pool() as pool:
        some_iterator = range(100)
        results = pool.imap_unordered(my_function, some_iterator)
        for result in results:
            print(result)
            if is_favourable(result):
                return result  # pool will be terminated exiting with.

if __name__ == '__main__':
    start = time.time()
    find()
    print(time.time() - start)

A single thread would find item 10 in 22 seconds. On my 8-core system it finds it in ~4 seconds:

SpawnPoolWorker-2 0
SpawnPoolWorker-3 1
SpawnPoolWorker-1 2
SpawnPoolWorker-5 3
SpawnPoolWorker-4 4
SpawnPoolWorker-8 5
SpawnPoolWorker-7 6
SpawnPoolWorker-6 7
SpawnPoolWorker-1 8
SpawnPoolWorker-3 9
SpawnPoolWorker-2 10
4
2
0
8
SpawnPoolWorker-4 11
SpawnPoolWorker-8 12
10
SpawnPoolWorker-5 13
6
12
SpawnPoolWorker-7 14
SpawnPoolWorker-6 15
14
SpawnPoolWorker-3 16
18
SpawnPoolWorker-1 17
SpawnPoolWorker-2 18
16
20
4.203129768371582

Upvotes: 3

Related Questions