Jaqo
Jaqo

Reputation: 349

Python multiprocessing with an updating queue and an output queue

How can I script a Python multiprocess that uses two Queues as these ones?:

  1. one as a working queue that starts with some data and that, depending on conditions of the functions to be parallelized, receives further tasks on the fly,
  2. another that gathers results and is used to write down the result after processing finishes.

I basically need to put some more tasks in the working queue depending on what I found in its initial items. The example I post below is silly (I could transform the item as I like and put it directly in the output Queue), but its mechanics are clear and reflect part of the concept I need to develop.

Hereby my attempt:

import multiprocessing as mp

def worker(working_queue, output_queue):
    item = working_queue.get() #I take an item from the working queue
    if item % 2 == 0:
        output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
    else:
        working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue 

if __name__ == '__main__':
    static_input = range(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    for result in iter(output_q.get, None):
        print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.

This does not end nor print any result.

At the end of the whole process I would like to ensure that the working queue is empty, and that all the parallel functions have finished writing to the output queue before the later is iterated to take out the results. Do you have suggestions on how to make it work?

Upvotes: 8

Views: 15471

Answers (2)

Jaqo
Jaqo

Reputation: 349

The following code achieves the expected results. It follows the suggestions made by @tawmas.

This code allows to use multiple cores in a process that requires that the queue which feeds data to the workers can be updated by them during the processing:

import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:
            break #this is the so-called 'poison pill'    
        else:
            picked = working_queue.get()
            if picked % 2 == 0: 
                    output_queue.put(picked)
            else:
                working_queue.put(picked+1)
    return

if __name__ == '__main__':
    static_input = xrange(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    results_bank = []
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() == True:
           break
       results_bank.append(output_q.get_nowait())
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
    results_bank.sort()
    print results_bank

Upvotes: 5

tawmas
tawmas

Reputation: 7833

You have a typo in the line that creates the processes. It should be mp.Process, not mp.process. This is what is causing the exception you get.

Also, you are not looping in your workers, so they actually only consume a single item each from the queue and then exit. Without knowing more about the required logic, it's not easy to give specific advice, but you will probably want to enclose the body of your worker function inside a while True loop and add a condition in the body to exit when the work is done.

Please note that, if you do not add a condition to explicitly exit from the loop, your workers will simply stall forever when the queue is empty. You might consider using the so-called poison pill technique to signal the workers they may exit. You will find an example and some useful discussion in the PyMOTW article on Communication Between processes.

As for the number of processes to use, you will need to benchmark a bit to find what works for you, but, in general, one process per core is a good starting point when your workload is CPU bound. If your workload is IO bound, you might have better results with a higher number of workers.

Upvotes: 3

Related Questions