Reputation: 4647
Python 3.2 introduced Concurrent Futures, which appear to be some advanced combination of the older threading and multiprocessing modules.
What are the advantages and disadvantages of using this for CPU bound tasks over the older multiprocessing module?
This article suggests they're much easier to work with - is that the case?
Upvotes: 277
Views: 142790
Reputation: 44283
Probably for most of the time when you need parallel processing, you will find that either the ProcessPoolExecutor
class from the concurrent.futures
module or the Pool
class from the multiprocessing
package will provide equivalent facilities and it boils down to a matter of personal preference. But each does offer some facilities that make certain processing more convenient. I thought I would just point out a few:
Submitting Multiple Tasks
Each package has analogs to the built-in map
and itertools.starmap
functions. If you have a worker function that takes a single argument, then multiple tasks can be submitted using the map
method with either package:
def worker_function(x):
# Return the square of the passed argument:
return x ** 2
# multiprocessing.pool example:
from multiprocessing import Pool
with Pool() as pool:
squares = pool.map(worker_function, (1, 2, 3, 4, 5, 6))
# concurrent.futures example:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
squares = list(executor.map(worker_function, (1, 2, 3, 4, 5, 6)))
Note that the multiprocessing.pool.Pool.map
method returns a list while the concurrent.futures.ProcessPoolExecutor.map
method returns an iterator just like the built-in map
method.
Both map
methods take a chunksize argument that batches submitted tasks into "chunks" that are pulled off the task input queue so that a pool process will process all the tasks in a chunk before getting the next chunk from the queue. This results in fewer but larger writes and reads to the input task queue. For large iterables being passed to the map
method, chunking up the tasks can greatly improve performance.
If not specified the default chunksize value for concurrent.futures.ProcessPoolExecutor
is 1, which is no chunking. For multiprocessing.pool.Pool
the default value is None
, which results in the class calculating a "suitable" chunksize based on the pool size and the number of elements in the passed iterable. At the time of this writing, the chunksize value is computed more or less as int(math.ceil(iterable_size / (4 * pool_size)))
. When doing multithreading with these packages (discussed briefly later), the default chunksize value for both packages is 1.
If the worker function takes multiple arguments, it's a bit easier to work with the concurrent.futures
package as its map
method can be passed multiple iterables:
def worker_function(x, y):
return x * y
x_values = (1, 2, 3)
y_values = (9, -2, -8)
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(worker_function, x_values, y_values))
We must use the starmap
method with the multiprocessing
package and the arguments must be "zipped" together if there are separate iterables for each argument:
def worker_function(x, y):
return x * y
x_values = (1, 2, 3)
y_values = (9, -2, -8)
with multiprocessing.Pool() as pool:
results = pool.starmap(worker_function, zip(x_values, y_values))
You do not have to use the zip
built-in function if the arguments are already combined together as follows:
def worker_function(x, y):
return x * y
args = (
(1, 9), # first x, y pair of arguments
(2, -2),
(3, -8)
)
with multiprocessing.Pool() as pool:
results = pool.starmap(worker_function, args)
Getting Task Results Back As Soon As They Are Produced
When submitting a batch of tasks, you sometimes want to be get the task results (i.e. return values) as soon as they become available. Both facilities provide for notification that a result from a submitted task is available via callback mechanisms:
Using multiprocessing.Pool
:
import multiprocessing as mp
def worker_process(i):
return i * i # square the argument
def process_result(return_value):
print(return_value)
def main():
pool = mp.Pool()
for i in range(10):
pool.apply_async(worker_process, args=(i,), callback=process_result)
pool.close()
pool.join()
if __name__ == '__main__':
main()
The same can be done, albeit awkwardly, using a callback with concurrent.futures
:
import concurrent.futures
def worker_process(i):
return i * i # square the argument
def process_result(future):
print(future.result())
def main():
executor = concurrent.futures.ProcessPoolExecutor()
futures = [executor.submit(worker_process, i) for i in range(10)]
for future in futures:
future.add_done_callback(process_result)
executor.shutdown()
if __name__ == '__main__':
main()
Here each task is individually submitted for which a Future
instance is returned. Then the callback must be added to the Future
. Finally, when the callback is invoked, the argument passed is the Future
instance for the task that has been completed and method result
must be called to get the actual return value. But with the concurrent.futures
module, there is actually no need to use a callback at all. You can use the as_completed
method:
import concurrent.futures
def worker_process(i):
return i * i # square the argument
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(worker_process, i) for i in range(10)]
for future in concurrent.futures.as_completed(futures):
print(future.result())
if __name__ == '__main__':
main()
And it is easy to tie the return value back to the original passed argument to worker_process
by using a dictionary to hold the Future
instances:
import concurrent.futures
def worker_process(i):
return i * i # square the argument
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(worker_process, i): i for i in range(10)}
for future in concurrent.futures.as_completed(futures):
i = futures[future] # retrieve the value that was squared
print(i, future.result())
if __name__ == '__main__':
main()
multiprocessing.Pool
has methods imap
and imap_unordered
, the latter which allows task results to be returned in arbitrary order, but not necessarily in completion order. These methods are considered to be a lazier version of map
. With method map
, if the passed iterable argument does not have a __len__
attribute, it will first be converted to a list
and its length will be used to compute an effective chunksize
value if None
was supplied as the chunksize argument. Therefore, you cannot achieve any storage optimizations by using a generator or generator expression as the iterable. But with methods imap
and imap_unordered
, the iterable can be a generator or generator expression; it will be iterated as necessary to produce new tasks for submission. But this necessitates that the default chunksize parameter be 1 since the length of the iterable in general cannot be known. But that doesn't stop you from providing a reasonable value using the same algorithm that the multiprocessing.Pool
class uses if you have a good approximation to the length of the iterable (or the exact size as in the example below):
import multiprocessing as mp
def worker_process(i):
return i * i # square the argument
def compute_chunksize(pool_size, iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, pool_size * 4)
if extra:
chunksize += 1
return chunksize
def main():
cpu_count = mp.cpu_count()
N = 100
chunksize = compute_chunksize(cpu_count, N)
with mp.Pool() as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
print(result)
if __name__ == '__main__':
main()
But with imap_unordered
there is no way to easily tie a result with a submitted job unless the worker process returned the original call arguments along with the return value. On the other hand the ability to specify a chunksize
with imap_unordered
and imap
, for which the results will be in a predictable order, should make these methods more efficient than invoking the apply_async
method repeatedly, which is essentially equivalent to using a chunksize of 1. But if you do need to process results in completion order, then to be sure you should use method apply_async
with a callback function. It does, however, appear based on experimentation that if you use a chunksize value of 1 with imap_unordered
, the results will be returned in completion order.
The map
method of the ProcessPoolExecutor
is best compared to the Pool.imap
method from the multiprocessing
package because they have a couple of similarities and one significant difference. The similarities: First, this method will not convert its passed input iterable arguments that are generator expressions to lists in order to compute effective chunksize values and that is why the chunksize argument defaults to 1 and why, if you are passing large iterables, you should consider specifying an appropriate chunksize value. Second, the ProcessPoolExecutor.map
method returns a result iterable that needs to be iterated to retrieve all the return values from the worker function and these results are available as soon as they have been generated with one difference: Unlike the Pool.imap
method, the ProcessPoolExecuter.map
method does not return its result iterable until all elements of the input iterable that was passed to it have been iterated and placed on the task queue, i.e. the input iterable is not lazily evaluated. It therefore follows that you cannot begin to retrieve results from the worker function until this occurs, even though many results might have been generated by the time that all input tasks have been iterated and queued up. And it also follows that if you have a situation where you can generate input faster than the worker function can generate its results, the input task queue storage requirements can get quite large.
Submitting a Task and Blocking Until It Is Completed
The multiprocessing.Pool
class has a method apply
that submits a task to the pool and blocks until the result is ready. The return value is just the return value from the worker function passed to the apply
function. For example:
import multiprocessing as mp
def worker_process(i):
return i * i # square the argument
def main():
with mp.Pool() as pool:
print(pool.apply(worker_process, args=(6,)))
print(pool.apply(worker_process, args=(4,)))
if __name__ == '__main__':
main()
The concurrent.futures.ProcessPoolExecutor
class has no such equivalent. You have to issue a submit
and then a call to result
against the returned Future
instance. It's not a hardship to have to do this, but the Pool.apply
method is more convenient for the use case where a blocking task submission is appropriate. Such a case is when you have processing that calls for threading because most of the work being done in the threads is heavily I/O except for perhaps one function that is very CPU bound. The main program that creates the threads first creates a multiprocessing.Pool
instance and passes it as an argument to all the threads. When the threads need to call the heavily CPU-bound function, it now runs the function using the Pool.apply
method thereby running the code in another process and freeing the current process to allow the other threads to run.
Multiprocessing or Multithreading?
A big deal has been made of the concurrent.futures
module having two classes, ProcessPoolExecutor
and ThreadPoolExecutor
with identical interfaces. That is a nice feature. But the multiprocessing
module also has an undocumented ThreadPool
class with an identical interface as Pool
:
>>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>
Note that you can also perform multithreading with:
# This Pool is a function with the same interface as the
# multiprocessing.pool.ThreadPool.__init__ initializer and returns a
# mulitprocessing.pool.ThreadPool instance:
from multiprocessing.dummy import Pool
Submitting Tasks One at a Time and Timeouts
You can submit single tasks with either ProcessPoolExecutor.submit
, which returns a Future
instance, or Pool.apply_async
, which returns an AsyncResult
instance, and specify a timeout value for retrieving the result:
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep
def worker_1():
while True:
print('hanging')
sleep(1)
def main():
with ProcessPoolExecutor(1) as pool:
future = pool.submit(worker_1)
try:
future.result(3) # kill task after 3 seconds?
except TimeoutError:
print('timeout')
if __name__ == '__main__':
main()
print("return from main()")
Prints:
hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.
The main process when calling future.result(3)
will get a TimeoutError
exception after 3 seconds because the submitted task has not completed within that time period. But the task is continuing to run, tying up the process and the with ProcessPoolExecutor(1) as pool:
block never exits and thus the program does not terminate.
from multiprocessing import Pool, TimeoutError
from time import sleep
def worker_1():
while True:
print('hanging')
sleep(1)
def main():
with Pool(1) as pool:
result = pool.apply_async(worker_1, args=())
try:
result.get(3) # kill task after 3 seconds?
except TimeoutError:
print('timeout')
if __name__ == '__main__':
main()
print("return from main()")
Prints:
hanging
hanging
hanging
timeout
return from main()
This time, however, even though the timed-out task is still continuing to run and is tying up the process, the with
block is not prevented from exiting and thus the program terminates normally. The reason for this is that the context manager for the Pool
instance will execute a call to terminate
when the block exits and this results in the immediate termination of all processes in the pool. This is contrasted with the context handler for the ProcessPoolExecutor
instance, which executes a call to shutdown(wait=True)
to await the termination of all processes in the pool when the block it governs exits. The advantage would seem to go to multiprocessing.Pool
if you are using context handlers to handle pool termination and the possibility of a timeout exists. Update: In Python 3.9, a new argument, cancel_futures, has been added to the shutdown
method. Consequently, you can terminate any tasks waiting to run (but not tasks already executing) if you explicitly call shutdown(cancel_futures=True)
instead of relying on the default behavior resulting from the implicit call to shutdown
when using a context handler.
But since the context handler for multiprocessing.Pool
only calls terminate
and not close
followed by join
, you must then ensure that all the jobs you have submitted have completed before exiting the with
block, for example by submitting jobs with a blocking, synchronous call such as map
or calling get
on the AsyncResult
object returned by a call to apply_async
or iterating the results of the call to imap
or by calling close
followed by join
on the pool instance.
Although there is no way to exit until timed-out tasks complete when using the ProcessPoolExecutor
, you can cancel the starting of submitted tasks that are not already running. In the following demo we have a pool of size 1 so that jobs can only run consecutively. We submit 3 jobs one after another where the first two jobs take 3 seconds to run because of calls to time.sleep(3)
. We immediately try to cancel the first two jobs. The first attempt of canceling fails because the first job is already running. But because the pool only has one process, the second job must wait 3 seconds for the the first job to complete before it can start running and therefore the cancel succeeds. Finally, job 3 will begin and end almost immediately after job 1 completes, which will be approximately 3 seconds after we started the job submissions:
from concurrent.futures import ProcessPoolExecutor
import time
def worker1(i):
time.sleep(3)
print('Done', i)
def worker2():
print('Hello')
def main():
with ProcessPoolExecutor(max_workers=1) as executor:
t = time.time()
future1 = executor.submit(worker1, 1)
future2 = executor.submit(worker1, 2)
future3 = executor.submit(worker2)
# this will fail since this task is already running:
print(future1.cancel())
# this will succeed since this task hasn't started (it's waiting for future1 to complete):
print(future2.cancel())
future3.result() # wait for completion
print(time.time() - t)
if __name__ == '__main__':
main()
Prints:
False
True
Done 1
Hello
3.1249606609344482
Upvotes: 117
Reputation: 1809
In addition to other answers' detailed list of differences, I've personally run into a unfixed (as-of 2022-11-20) indefinite hang that can happen with multiprocess.Pool when one of the workers crashes in certain ways. (In my case, an exception from a cython extension, though others say this can happen when a worker gets a SIGTERM, etc.) According to the documentation for ProcessPoolExecutor, it has been robust to this since python 3.3.
Upvotes: 9
Reputation: 20185
I love concurrent.futures
, mainly because the iterator of multiple function parameters: multiprocessing
is somehow hacky when it comes to obtain multiple arguments to a function (there is no istarmap()
-equivalent of starmap()
):
import multiprocessing as mp
def power_plus_one(x, y):
return (x**y) + 1
def wrapper(t):
return power_plus_one(*t)
with mp.Pool() as pool:
r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))
print(r)
I find imap()
/imap_unordered()
super helpful for progress bars like tqdm
or time estimations for larger computation. In concurrents.futures
, this is super handy:
def power_plus_one(x, y):
return (x**y) + 1
o = dict() # dict for output
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]}
for future in concurrent.futures.as_completed(futures):
i = futures[future]
o[i] = future.result()
print(o)
I also love the handy result mapping as a dict. :)
With tqdm you can easily:
for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
...
Upvotes: 6
Reputation: 4665
concurrent.futures
give you more control, for example:
# Created by [email protected] at 2021/10/19 10:37
import concurrent.futures
import multiprocessing.pool
import random
import threading
import time
def hello(name):
time.sleep(random.random())
return f"Hello {name} {threading.current_thread()} "
print("ThreadPool:")
pool = multiprocessing.pool.ThreadPool(4)
for args, result in pool.imap_unordered(lambda x: (x, hello(x)), range(10)):
print(args, "=>", result)
print("\nThreadPoolExecutor:")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
futures = {executor.submit(hello, x): x for x in range(10)}
for future in concurrent.futures.as_completed(futures):
print(futures[future], "=>", future.result()
Example output:
ThreadPool:
1 => Hello 1 <DummyProcess(Thread-2, started daemon 29700)>
0 => Hello 0 <DummyProcess(Thread-1, started daemon 29688)>
2 => Hello 2 <DummyProcess(Thread-3, started daemon 19680)>
6 => Hello 6 <DummyProcess(Thread-3, started daemon 19680)>
3 => Hello 3 <DummyProcess(Thread-4, started daemon 33028)>
4 => Hello 4 <DummyProcess(Thread-2, started daemon 29700)>
5 => Hello 5 <DummyProcess(Thread-1, started daemon 29688)>
9 => Hello 9 <DummyProcess(Thread-2, started daemon 29700)>
8 => Hello 8 <DummyProcess(Thread-4, started daemon 33028)>
7 => Hello 7 <DummyProcess(Thread-3, started daemon 19680)>
ThreadPoolExecutor:
0 => Hello 0 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
1 => Hello 1 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
2 => Hello 2 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
4 => Hello 4 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
3 => Hello 3 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
8 => Hello 8 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
5 => Hello 5 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
6 => Hello 6 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
7 => Hello 7 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
9 => Hello 9 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
Upvotes: 0
Reputation: 99
In my experience, I faced a lot of issues with the multiprocessing module as compared to concurrent.futures.(But this was on Windows os)
Two of main differences i could see were:
Example: (Fetching the result)
with concurrent.futures.ProcessPoolExecutor() as executor:
f1 = executor.submit(some_function, parameter_to_be_passed)
print(f1.result())
So if you returning any value from some_function()
you can directly catch/store it using f1.result()
. The very same thing will need additional steps in the "multiprocessing" module.
If you are running on Linux systems then the hangs might not occur but the execution complexity is still more in the "multiprocessing" module.
Also having said this, it is also important to note my tasks were highly CPU intensive tasks.
On a personal note, I would recommend concurrent.futures.
Upvotes: 2
Reputation: 70735
I wouldn't call concurrent.futures
more "advanced" - it's a simpler interface that works very much the same regardless of whether you use multiple threads or multiple processes as the underlying parallelization gimmick.
So, like virtually all instances of "simpler interface", much the same trade-offs are involved: it has a shallower learning curve, in large part just because there's so much less available to be learned; but, because it offers fewer options, it may eventually frustrate you in ways the richer interfaces won't.
So far as CPU-bound tasks go, that's way too under-specified to say much meaningful. For CPU-bound tasks under CPython, you need multiple processes rather than multiple threads to have any chance of getting a speedup. But how much (if any) of a speedup you get depends on the details of your hardware, your OS, and especially on how much inter-process communication your specific tasks require. Under the covers, all inter-process parallelization gimmicks rely on the same OS primitives - the high-level API you use to get at those isn't a primary factor in bottom-line speed.
Edit: example
Here's the final code shown in the article you referenced, but I'm adding an import statement needed to make it work:
from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
# Let the executor divide the work among processes by using 'map'.
with ProcessPoolExecutor(max_workers=nprocs) as executor:
return {num:factors for num, factors in
zip(nums,
executor.map(factorize_naive, nums))}
Here's exactly the same thing using multiprocessing
instead:
import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
with mp.Pool(nprocs) as pool:
return {num:factors for num, factors in
zip(nums,
pool.map(factorize_naive, nums))}
Note that the ability to use multiprocessing.Pool
objects as context managers was added in Python 3.3.
As for which one is easier to work with, they're essentially identical.
One difference is that Pool
supports so many different ways of doing things that you may not realize how easy it can be until you've climbed quite a way up the learning curve.
Again, all those different ways are both a strength and a weakness. They're a strength because the flexibility may be required in some situations. They're a weakness because of "preferably only one obvious way to do it". A project sticking exclusively (if possible) to concurrent.futures
will probably be easier to maintain over the long run, due to the lack of gratuitous novelty in how its minimal API can be used.
Upvotes: 249