Reputation: 3862
In Python 2.7 i have implemented a multiprocessing scenario with multiple queues and consumers. The simplified idea is, that I have a producer of jobs, which are fed to a consumer, handling the jobs and an error handler, that does all the logging. Very simplified, it all looks comparable to that:
import multiprocessing as mp
import Queue
job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
job_queue.put(i)
def job_handler(job_queue, error_queue):
print 'Job handler'
while True:
try:
element = job_queue.get_nowait()
print element
except:
# t1
error_queue.put('Error')
error_queue.close()
error_queue.join_thread()
job_queue.close()
job_queue.join_thread()
# t2
return 1
def error_handler(error_queue):
result = error_queue.get()
if result == 'Error':
error_queue.close()
error_queue.join_thread()
if __name__ == '__main__':
print 'Starting'
p1 = mp.Process(target = error_handler, args = (error_queue, ))
p1.start()
p2 = mp.Process(target = job_handler, args = (job_queue, error_queue))
p2.start()
This basically works, but in my more complex programm, there is a very long time difference between the two commentary points t1
and t2
(about 5 min). So I have two questions:
close()
and join_thread()
on all used Queue objects, to indicate it's done using them? I think, that subprocesses do that implicitly when I end them, for example by returning as stated here:join_thread() Join the background thread. This can only be used after close() has been called. It blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.
By default if a process is not the creator of the queue then on exit it will attempt to join the queue’s background thread. The process can call cancel_join_thread() to make join_thread() do nothing.
Upvotes: 5
Views: 5836
Reputation: 188
Yes if your output queue (in your case error queue) is too heavy, you get seconds of overhead to join the processes. I recommend you use a multithreading.Manager.list instead. I added an example here. On my side, it reduced from 5 sec to 0.8sec the downtime to join processes.
I even wanted to use the same for my input queue to eliminate the remaining ~0.8 sec of overhead by using a list + multithreading.Value as index, but joining would also take +5sec.
Upvotes: 0
Reputation: 53
Before joining the process that uses queue, you should empty the queue first. Otherwise deadlock will be introduced.
Below are copied from python multiprocessing doc.
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
An example which will deadlock is the following:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()
A fix here would be to swap the last two lines (or simply remove the p.join() line).
Upvotes: 2
Reputation: 15513
Have a Scenario to reproduce your delay between point # t1 and # t2.
At # t1 the Queue is full. Process p2 have to wait until all the buffered items are fed by the “feeder” thread to the underlying pipe.
Warning: If p2 could not put all messages in the Queue, this become a deadlock.
At the time p2 terminates, the error_handler ist still feeding messages from the Queue.
Note: For my environment, as this is OS dependend, i have to put at least 3,500 items in the Queue to get this behavior.
This is the Profiling output:
Starting
Start Error Handler
Start Job handler
main blocked until p2 terminates
job=0 job=1 job=2 job=3
except job_handler
# t1
error_queue.put('Error') * 3500
error_handler result[1]=Error
close error_queue
error_handler result[100]=Error
# t2 delayed 0:00:02.318323
exit(1) job_handler
p2 terminates with exitcode=1
job_queue has 5 outstanding jobs, empty=False
get outstanding job 5,6,7,8,9
error_handler result[1000]=Error
error_handler result[2000]=Error
error_handler result[3000]=Error
exit error_handler got 3500 result=Error
p1 terminates with exitcode=0
error_queue has 0 outstanding message(s), empty=True
END __main__
Process finished with exit code 0
Upvotes: 1
Reputation: 15513
Found the following in the docs: docs.python.org
From the docs:
Joining processes that use queuesBear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
As i unterstand, a process, here p2=jobHandler, should not exit immediately after put items in a queue to avoid loose of queued data. Can't found any Explanation for the sentence Otherwise you cannot ... have put items on the queue will terminate.
Beside the above, I want to comment your code. I recognized that this code is simplified.
Avoid placing code that is executed on startup outside of if __name__ == '__main__':
From the docs: Safe importing of main module One should protect the “entry point” of the program by using if name == 'main':
job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
job_queue.put(i)
.close()
in def job_handler/error_handler
except:
...
job_queue.close()
This is wrong, as job_handler process will never put messages on this queue.
This apply also for process error_handler and error_queue.close()
From the docs:
Indicate that no more data will be put on this queue by the current process.
The background thread will quit once it has flushed all buffered data to the pipe.
This is called automatically when the queue is garbage collected.
.join_thread()
in def job_handler/error_handler
This is useless, as the job_handler process dosen't put messages on this queue. Therefore .join_thread
does nothing.
This is also true for process error_handler.
except:
...
job_queue.join_thread()
# t2
def error_handler(error_queue):
...
error_queue.close()
error_queue.join_thread()
Use Exit(1)
instead of return 1
The Errorcode '1' can not be catched with p2.exitcode.
Think more a process as a own programm instead of a function.
return 1
Try the following:
# t1
error_queue.put('Error')
error_queue.close()
# Give the error_handler a chance to get a timeslice
time.sleep(0.2)
error_queue.join_thread()
#job_queue.close()
#job_queue.join_thread()
# t2
exit(1)
Tested with Python:3.4.2 and Python:2.7.9
Upvotes: 4
Reputation: 26901
Calling .close()
and .join_thread()
are a recommendation however they are not a must. .close()
is called automatically when the Queue is garbage collected, and .join_thread()
is called automatically upon process termination.
Unfortunately, I ran your piece of code and got a beautiful termination after 5 seconds with 0-9 printed. Even when I pushed an unprintable character, I did not receive any delay. The code seems to work fluently.
Regarding your more complex program, it might happen if you pass a lot of data over the queue. The queue is used as IPC meaning the data is encoded in one side, pushed into a pipe, and decoded on the other side. Passing a lot of data causes a slowdown. Since it eventually resolves itself, it does not seem like a deadlock.
Although best to avoid it, an option would be to use shared memory instead of a queue. That way, data does not need to pass between processes but just stay in one memory segment shared by both.
Upvotes: 2