Reputation: 201
I have a function which is both highly I/O dependent and CPU-intensive. I tried to parallelize it by multiprocessing and multithreading but it is stuck. This question was asked before but in a different setting. My function is fully independent and returns nothing. Why is it stuck? How can it be fixed?
import concurrent.futures
import os
import numpy as np
import time
ids = [1,2,3,4,5,6,7,8]
def f(x):
time.sleep(1)
x**2
def multithread_accounts(AccountNumbers, f, n_threads = 2):
slices = np.array_split(AccountNumbers, n_threads)
slices = [list(i) for i in slices]
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(f, slices)
def parallelize_distribute(AccountNumbers, f, n_threads = 2, n_processors = os.cpu_count()):
slices = np.array_split(AccountNumbers, n_processors)
slices = [list(i) for i in slices]
with concurrent.futures.ProcessPoolExecutor(max_workers=n_processors) as executor:
executor.map( lambda x: multithread_accounts(x, f, n_threads = n_threads) , slices)
parallelize_distribute(ids, f, n_processors=2, n_threads=2)
Upvotes: 1
Views: 1620
Reputation: 70582
Sorry, but I can't make time to explain all this, so I'll just give code "that works". I urge you to start with something simpler, because the learning curve is non-trivial. Leave numpy out of it at first; stick to only threads at first; then move to only processes; and unless you're an expert don't try to parallelize anything other than named module-level functions (no, not function-local anonymous lambdas).
As often happens, the error messages you "should be" getting are being suppressed because they occur asynchronously so there's no good way to report them. Liberally add print()
statements to see how far you're getting.
Note: I stripped numpy out of this, and added the stuff needed so it runs on Windows too. I expect using numpy's array_split()
instead would work fine, but I didn't have numpy handy on the machine I was on at the time.
import concurrent.futures as cf
import os
import time
def array_split(xs, n):
from itertools import islice
it = iter(xs)
result = []
q, r = divmod(len(xs), n)
for i in range(r):
result.append(list(islice(it, q+1)))
for i in range(n - r):
result.append(list(islice(it, q)))
return result
ids = range(1, 11)
def f(x):
print(f"called with {x}")
time.sleep(5)
x**2
def multithread_accounts(AccountNumbers, f, n_threads=2):
with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
for slice in array_split(AccountNumbers, n_threads):
executor.map(f, slice)
def parallelize_distribute(AccountNumbers, f, n_threads=2, n_processors=os.cpu_count()):
slices = array_split(AccountNumbers, n_processors)
print("top slices", slices)
with cf.ProcessPoolExecutor(max_workers=n_processors) as executor:
executor.map(multithread_accounts, slices,
[f] * len(slices),
[n_threads] * len(slices))
if __name__ == "__main__":
parallelize_distribute(ids, f, n_processors=2, n_threads=2)
BTW, I suggest this makes more sense for the threaded part:
def multithread_accounts(AccountNumbers, f, n_threads=2):
with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
executor.map(f, AccountNumbers)
That is, there's really no need to split the list yourself here - the threading machinery will split it up itself. It's possible you missed that in your original attempts, because the ThreadPoolExecutor()
call in the code you posted forgot to specify the max_workers
argument.
Upvotes: 2