Tary
Tary

Reputation: 103

How to successfully utilize Queue.join() with multiprocessing?

I am trying to learn the multiprocessing library in Python but I cannot get my code to work with queue.Queue. Simply put, I have no idea where to put the queue.Queue.join() method in my code. Does it go IN the while loop or outside of it? If it goes outside of the while loop, do I write while q.not_empty? Why would I use q.not_empty when it the docs it is explicitly mentioned to use join()?

Here is my code. I am expecting my 4 cores, simultaneously to return the amount of prime numbers computed by my function, 2 times per core for the total amount of 8 computations. The prime computing functions works without problem.

import queue
def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
    while q.not_empty:
        result = q.get()
        function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
        function.get()
    q.join()

With the code above, I break out of the loop if the queue is empty. But this is supposed to be unrealiable and why would I need q.join() afterwards?

With the code below, I can't break out of the loop. Changes are while True and position of q.join()

def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
    while True:
        result = q.get()
        function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
        function.get()
        q.join()

Where should I put q.join?

P.S. This code also does not parrallelise the tasks effectively, it essentially computes the functions one by one and I cannot understand why, but this is a different problem.

P.S. 2

Code for prime function

def compute_primes(start, end):
start_time = time.time()
primes = []
for number in range(start, end + 1):
    flag = True
    for i in range(2, number):
        if (number % i) == 0:
            flag = False
            break
    if flag:
        primes.append(number)
end_time = time.time()
print(f"Time taken: {end_time - start_time}\n"
      f"Amount primes: {len(primes)}")
return primes

Upvotes: 0

Views: 1689

Answers (2)

user2668284
user2668284

Reputation:

I prefer not to specify an argument to the Pool constructor unless, for some reason, I want very few concurrent processes. By constructing Pool without an argument, the number of potential concurrent processes will vary from computer to computer depending on its CPU architecture. Here's how I would implement your task (assuming I understand your use-case completely):

from multiprocessing import Pool


def genPrime():  # prime number generator
    D = {}
    q = 2
    while True:
        if q not in D:
            yield q
            D[q * q] = [q]
        else:
            for p in D[q]:
                D.setdefault(p + q, []).append(p)
            del D[q]
        q += 1


def compute_primes(n):
    g = genPrime()
    return [next(g) for _ in range(n)]


NCOMPUTATIONS = 8
NPRIMES = 30_000


def main():
    with Pool() as pool:
        ar = []
        for _ in range(NCOMPUTATIONS):
            ar.append(pool.apply_async(compute_primes, [NPRIMES]))
        for _ar in ar:
            result = _ar.get() # waits for process to terminate and get its return value
            assert len(result) == NPRIMES


if __name__ == '__main__':
    main()

[ Please note that I am not the author of the genPrime function ]

Upvotes: 1

2e0byo
2e0byo

Reputation: 5944

Queues and Pools

Running one at a time... separate problem.

Actually, this is part of the same problem. All this implies that you are not using a multiprocessing pool managed by Pool. What you do at the moment is to put all your tasks in a queue, get them straight back out again, and then process them one at a time using a pool, which only ever gets one task at a time. These two paradigms are mutually exclusive: if you want to use a pool to do the work for you, you don't need queue; if you need to handle the queue yourself, you probably don't want to use pool.

Pool

multiprocessing.Pool and accompanying methods spawn the right number of worker processes, serialise your function to them, and then set up a queue internally and handle sending tasks and getting results. This is much easier than doing it manually, and is normally the right way to go about things:

When you use pool, you do something like this:

results = pool.map(compute_primes, [(0,100_000) for _ in range(8)])

Which will block for you until all the pool has finished, or:

results = pool.map_async(compute_primes, [(0, 100_000) for _ in range(8)])
results.wait() # wait

unless you plan to process the results as they come in, in which case you don't use results.wait() at all:

for _ in range(8):
    result = results.get()
    do_stuff(result)

You do use pool.join() or pool.close() just to make sure the pool is shut down gracefully, but this has nothing to do with getting your results.

Your examples

Your first example works, because you do this:

  • put tasks in a queue
  • get them out one by one and process them
  • join an empty queue -> leave immediately

Your second example fails, because you do this:

  • put tasks in a queue
  • get one task out
  • wait for queue to be empty or done -> blocks indefinitely

In this case, you don't want a queue at all.

Using Queue manually

Aside: where are you getting your Queue from? multiprocessing.Queue is not joinable; you need multiprocessing.JoinableQueue. threading.Queue should not be used with multiprocessing. queue.Queue, likewise, should not be used with `multiprocessing.

When do you use a task queue? When you don't just want to apply a bunch of arguments to a bunch of functions. Perhaps you want to use a custom class. Perhaps you want to do something funny. Perhaps you want to do some things with one type of argument, but something else if the argument is of a certain kind, and code is better organised this way. In these cases subclassing Process (or Thread for multithreading) yourself may be clearer and cleaner. None of that seems to apply in this case.

Using join with queues

.join() is for task queues. It blocks until every task in the queue has been marked as done. This is handy for when you want to offload some processing to a bunch of processes, but wait for them before you do anything. so you normally do something like:

tasks = JoinableQueue()
for t in qs:
    tasks.put(t)
start_multiprocessing() # dummy fn
tasks.join() # wait for everything to be done

However in this case you don't do that, or want to do it.

Upvotes: 1

Related Questions