Reputation: 103
I am trying to learn the multiprocessing library in Python but I cannot get my code to work with queue.Queue
. Simply put, I have no idea where to put the queue.Queue.join()
method in my code. Does it go IN the while loop or outside of it? If it goes outside of the while loop, do I write while q.not_empty
? Why would I use q.not_empty
when it the docs it is explicitly mentioned to use join()
?
Here is my code. I am expecting my 4 cores, simultaneously to return the amount of prime numbers computed by my function, 2 times per core for the total amount of 8 computations. The prime computing functions works without problem.
import queue
def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
while q.not_empty:
result = q.get()
function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
function.get()
q.join()
With the code above, I break out of the loop if the queue is empty. But this is supposed to be unrealiable and why would I need q.join()
afterwards?
With the code below, I can't break out of the loop. Changes are while True
and position of q.join()
def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
while True:
result = q.get()
function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
function.get()
q.join()
Where should I put q.join
?
P.S. This code also does not parrallelise the tasks effectively, it essentially computes the functions one by one and I cannot understand why, but this is a different problem.
P.S. 2
Code for prime function
def compute_primes(start, end):
start_time = time.time()
primes = []
for number in range(start, end + 1):
flag = True
for i in range(2, number):
if (number % i) == 0:
flag = False
break
if flag:
primes.append(number)
end_time = time.time()
print(f"Time taken: {end_time - start_time}\n"
f"Amount primes: {len(primes)}")
return primes
Upvotes: 0
Views: 1689
Reputation:
I prefer not to specify an argument to the Pool constructor unless, for some reason, I want very few concurrent processes. By constructing Pool without an argument, the number of potential concurrent processes will vary from computer to computer depending on its CPU architecture. Here's how I would implement your task (assuming I understand your use-case completely):
from multiprocessing import Pool
def genPrime(): # prime number generator
D = {}
q = 2
while True:
if q not in D:
yield q
D[q * q] = [q]
else:
for p in D[q]:
D.setdefault(p + q, []).append(p)
del D[q]
q += 1
def compute_primes(n):
g = genPrime()
return [next(g) for _ in range(n)]
NCOMPUTATIONS = 8
NPRIMES = 30_000
def main():
with Pool() as pool:
ar = []
for _ in range(NCOMPUTATIONS):
ar.append(pool.apply_async(compute_primes, [NPRIMES]))
for _ar in ar:
result = _ar.get() # waits for process to terminate and get its return value
assert len(result) == NPRIMES
if __name__ == '__main__':
main()
[ Please note that I am not the author of the genPrime function ]
Upvotes: 1
Reputation: 5944
Running one at a time... separate problem.
Actually, this is part of the same problem. All this implies that you are not
using a multiprocessing pool managed by Pool
. What you do at the moment is to
put all your tasks in a queue, get them straight back out again, and then
process them one at a time using a pool, which only ever gets one task at a
time. These two paradigms are mutually exclusive: if you want to use a pool to
do the work for you, you don't need queue; if you need to handle the queue
yourself, you probably don't want to use pool
.
multiprocessing.Pool
and accompanying methods spawn the right number of worker
processes, serialise your function to them, and then set up a queue internally
and handle sending tasks and getting results. This is much easier than doing it
manually, and is normally the right way to go about things:
When you use pool, you do something like this:
results = pool.map(compute_primes, [(0,100_000) for _ in range(8)])
Which will block for you until all the pool has finished, or:
results = pool.map_async(compute_primes, [(0, 100_000) for _ in range(8)])
results.wait() # wait
unless you plan to process the results as they come in, in which case you don't
use results.wait()
at all:
for _ in range(8):
result = results.get()
do_stuff(result)
You do use pool.join()
or pool.close()
just to make sure the pool is shut
down gracefully, but this has nothing to do with getting your results.
Your first example works, because you do this:
Your second example fails, because you do this:
In this case, you don't want a queue at all.
Aside: where are you getting your Queue
from? multiprocessing.Queue
is not
joinable; you need multiprocessing.JoinableQueue
. threading.Queue
should
not be used with multiprocessing
. queue.Queue
, likewise, should not be used
with `multiprocessing.
When do you use a task queue? When you don't just want to apply a bunch of
arguments to a bunch of functions. Perhaps you want to use a custom class.
Perhaps you want to do something funny. Perhaps you want to do some things with
one type of argument, but something else if the argument is of a certain kind,
and code is better organised this way. In these cases subclassing Process
(or
Thread
for multithreading) yourself may be clearer and cleaner. None of that
seems to apply in this case.
join
with queues.join()
is for task queues. It blocks until every task in the queue has
been marked as done. This is handy for when you want to offload some processing
to a bunch of processes, but wait for them before you do anything. so you
normally do something like:
tasks = JoinableQueue()
for t in qs:
tasks.put(t)
start_multiprocessing() # dummy fn
tasks.join() # wait for everything to be done
However in this case you don't do that, or want to do it.
Upvotes: 1