Reputation: 4451
I'm trying to speed up a simple Python program using multiprocessing's Pool. Specifically: the imap_unordered function.
In my case I'm searching for a specific object with specific properties, and checking this property takes a long time, hence the reason I want to spread the load over my CPU cores.
I created the following code:
from multiprocessing import Pool as ThreadPool
pool = ThreadPool(4)
some_iterator = (create_item() for _ in range(100000))
results = pool.imap_unordered(my_function, some_iterator)
for result in results:
if is_favourable(result):
break
Unfortunately, after calling break, there is still a lot of activity in the threads (as can be observed in my computers activity monitor). How should I keep searching for results till I find a favourable one, or how can I stop iterating over all items using the imap_unordered iterator?
Upvotes: 1
Views: 1869
Reputation: 123501
For starters, your example code is not using a multiprocessing
ThreadPool
because your import
statement is wrong (it's just allowing access to the regular Pool
class via that name).
Regardless, you can just use the Pool
/ThreadPool
as a context manager since Python 3.3 and put the loop inside it. This will cause its terminate()
method to be called automatically when the context is exited (due to the break
statement in the example below), and it will will immediately stop the working processes.
from multiprocessing import current_process
from multiprocessing.pool import ThreadPool
from random import randint
import time
def create_item():
return randint(0, 20)
def is_favourable(value):
return value < 20
def my_function(value):
print(current_process().name, value)
time.sleep(2)
return value * 2
if __name__ == '__main__':
with ThreadPool(4) as pool: # Use as context manager (Python 3.3+)
some_iterator = (create_item() for _ in range(10000))
start = time.time()
results = pool.imap_unordered(my_function, some_iterator)
for result in results:
print('result:', result)
if is_favourable(result):
break # Stop loop and exit Pool context.
print('done')
print(time.time() - start)
If you're using an older version of Python, you can just explicitly call pool.terminate()
immediately before the break
statement (and not use a with
statement).
Upvotes: 3
Reputation: 177991
Pool.terminate()
will immediately stop the working processes, while Pool.close()
will stop submitting tasks and the processes will close once their current task is done.
Pool.terminate()
will also be called if the Pool
instance is garbage-collected, or by using it with with
, so the following is a solution:
import multiprocessing as mp
import time
def my_function(item):
print(mp.current_process().name,item)
time.sleep(2) # imitate a long process
return item * 2
def is_favourable(item):
return item == 20 # something to look for (result of item 10)
def find():
with mp.Pool() as pool:
some_iterator = range(100)
results = pool.imap_unordered(my_function, some_iterator)
for result in results:
print(result)
if is_favourable(result):
return result # pool will be terminated exiting with.
if __name__ == '__main__':
start = time.time()
find()
print(time.time() - start)
A single thread would find item 10 in 22 seconds. On my 8-core system it finds it in ~4 seconds:
SpawnPoolWorker-2 0
SpawnPoolWorker-3 1
SpawnPoolWorker-1 2
SpawnPoolWorker-5 3
SpawnPoolWorker-4 4
SpawnPoolWorker-8 5
SpawnPoolWorker-7 6
SpawnPoolWorker-6 7
SpawnPoolWorker-1 8
SpawnPoolWorker-3 9
SpawnPoolWorker-2 10
4
2
0
8
SpawnPoolWorker-4 11
SpawnPoolWorker-8 12
10
SpawnPoolWorker-5 13
6
12
SpawnPoolWorker-7 14
SpawnPoolWorker-6 15
14
SpawnPoolWorker-3 16
18
SpawnPoolWorker-1 17
SpawnPoolWorker-2 18
16
20
4.203129768371582
Upvotes: 3