Dschoni
Dschoni

Reputation: 3862

Joining multiprocessing queue takes a long time

In Python 2.7 i have implemented a multiprocessing scenario with multiple queues and consumers. The simplified idea is, that I have a producer of jobs, which are fed to a consumer, handling the jobs and an error handler, that does all the logging. Very simplified, it all looks comparable to that:

import multiprocessing as mp
import Queue

job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
    job_queue.put(i)

def job_handler(job_queue, error_queue):
    print 'Job handler'
    while True:
        try: 
            element = job_queue.get_nowait()
            print element
        except:
# t1
            error_queue.put('Error')
            error_queue.close()
            error_queue.join_thread()
            job_queue.close()
            job_queue.join_thread()
# t2
            return 1

def error_handler(error_queue):
    result = error_queue.get()
    if result == 'Error':
        error_queue.close()
        error_queue.join_thread()

if __name__ == '__main__':
    print 'Starting'
    p1 = mp.Process(target = error_handler, args = (error_queue, ))
    p1.start()
    p2 = mp.Process(target = job_handler, args = (job_queue, error_queue))
    p2.start()

This basically works, but in my more complex programm, there is a very long time difference between the two commentary points t1 and t2 (about 5 min). So I have two questions:

  1. Do I understand correct, that every process should call close() and join_thread() on all used Queue objects, to indicate it's done using them? I think, that subprocesses do that implicitly when I end them, for example by returning as stated here:

join_thread() Join the background thread. This can only be used after close() has been called. It blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.

By default if a process is not the creator of the queue then on exit it will attempt to join the queue’s background thread. The process can call cancel_join_thread() to make join_thread() do nothing.

  1. How can I figure out why the join process takes such a long time?

Upvotes: 5

Views: 5836

Answers (5)

Julienm
Julienm

Reputation: 188

Yes if your output queue (in your case error queue) is too heavy, you get seconds of overhead to join the processes. I recommend you use a multithreading.Manager.list instead. I added an example here. On my side, it reduced from 5 sec to 0.8sec the downtime to join processes.

I even wanted to use the same for my input queue to eliminate the remaining ~0.8 sec of overhead by using a list + multithreading.Value as index, but joining would also take +5sec.

Upvotes: 0

Zephyr Sails
Zephyr Sails

Reputation: 53

Before joining the process that uses queue, you should empty the queue first. Otherwise deadlock will be introduced.

Below are copied from python multiprocessing doc.

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

An example which will deadlock is the following:

from multiprocessing import Process, Queue

def f(q):
   q.put('X' * 1000000)

if __name__ == '__main__':
   queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join()                    # this deadlocks
obj = queue.get()

A fix here would be to swap the last two lines (or simply remove the p.join() line).

Upvotes: 2

stovfl
stovfl

Reputation: 15513

Have a Scenario to reproduce your delay between point # t1 and # t2.

At # t1 the Queue is full. Process p2 have to wait until all the buffered items are fed by the “feeder” thread to the underlying pipe.

Warning: If p2 could not put all messages in the Queue, this become a deadlock.

At the time p2 terminates, the error_handler ist still feeding messages from the Queue.

Note: For my environment, as this is OS dependend, i have to put at least 3,500 items in the Queue to get this behavior.

This is the Profiling output:

Starting  
Start Error Handler  
Start Job handler  
main blocked until p2 terminates  
job=0  job=1  job=2  job=3
except job_handler  
    # t1  
    error_queue.put('Error') * 3500  
        error_handler result[1]=Error  
    close error_queue  
        error_handler result[100]=Error  
    # t2 delayed 0:00:02.318323  
exit(1) job_handler  
p2 terminates with exitcode=1  
    job_queue has 5 outstanding jobs, empty=False  
    get outstanding job 5,6,7,8,9  
        error_handler result[1000]=Error  
        error_handler result[2000]=Error  
        error_handler result[3000]=Error  
exit error_handler got 3500 result=Error  
p1 terminates with exitcode=0  
    error_queue has 0 outstanding message(s), empty=True  
END __main__  

Process finished with exit code 0

Upvotes: 1

stovfl
stovfl

Reputation: 15513

Found the following in the docs: docs.python.org

From the docs:
Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

As i unterstand, a process, here p2=jobHandler, should not exit immediately after put items in a queue to avoid loose of queued data. Can't found any Explanation for the sentence Otherwise you cannot ... have put items on the queue will terminate.

Beside the above, I want to comment your code. I recognized that this code is simplified.

  1. Avoid placing code that is executed on startup outside of if __name__ == '__main__':

    From the docs: Safe importing of main module One should protect the “entry point” of the program by using if name == 'main':

    job_queue = mp.Queue() error_queue = mp.Queue() for i in range(10): job_queue.put(i)

  2. .close() in def job_handler/error_handler

    except: ... job_queue.close()

    This is wrong, as job_handler process will never put messages on this queue.
    This apply also for process error_handler and error_queue.close()

From the docs:
Indicate that no more data will be put on this queue by the current process.
The background thread will quit once it has flushed all buffered data to the pipe.
This is called automatically when the queue is garbage collected.

  1. .join_thread() in def job_handler/error_handler
    This is useless, as the job_handler process dosen't put messages on this queue. Therefore .join_thread does nothing. This is also true for process error_handler.

    except:
    ...
    job_queue.join_thread()

    # t2

    def error_handler(error_queue): ... error_queue.close() error_queue.join_thread()

  2. Use Exit(1) instead of return 1
    The Errorcode '1' can not be catched with p2.exitcode.
    Think more a process as a own programm instead of a function.

    return 1

Try the following:

# t1
error_queue.put('Error')
error_queue.close()

# Give the error_handler a chance to get a timeslice
time.sleep(0.2)

error_queue.join_thread()
#job_queue.close()
#job_queue.join_thread()
# t2
exit(1)

Tested with Python:3.4.2 and Python:2.7.9

Upvotes: 4

Bharel
Bharel

Reputation: 26901

Calling .close() and .join_thread() are a recommendation however they are not a must. .close() is called automatically when the Queue is garbage collected, and .join_thread() is called automatically upon process termination.

Unfortunately, I ran your piece of code and got a beautiful termination after 5 seconds with 0-9 printed. Even when I pushed an unprintable character, I did not receive any delay. The code seems to work fluently.

Regarding your more complex program, it might happen if you pass a lot of data over the queue. The queue is used as IPC meaning the data is encoded in one side, pushed into a pipe, and decoded on the other side. Passing a lot of data causes a slowdown. Since it eventually resolves itself, it does not seem like a deadlock.

Although best to avoid it, an option would be to use shared memory instead of a queue. That way, data does not need to pass between processes but just stay in one memory segment shared by both.

Upvotes: 2

Related Questions