Reputation: 6045
I have a main process that spawns Processes and then those Processes add jobs to a Queue which the main process uses to spawn more processes. And, that works fine for about 20 seconds and then the main process just stops spawning jobs even though it has plenty of jobs in the queue.
here's the code for the job process's run loop:
try:
page = self.fetch_page(self.url, self.timeout)
if page != None:
#do_stuff
pass
except Exception, e: #Log any errors
import traceback
self.log(str(traceback.format_exc(limit=10)), level="error")
self.log(str(e), level="error")
finally:
import os, signal
print "releasing Semaphore"
self.__sem.release()
#print "Joining pQueue" #these statements raise errors...
#self.__pqueue.join_thread()
#print "Joining lQueue"
#self.__log.join_thread()
print "exiting"
os._exit(1)
#os.kill(self.pid, signal.SIGTERM)
And here's the code for main process that spawns the jobs:
while True:
print "Waiting for url"
url = self.page_queue.get()
print "waiting for semaphore"
self.__sem.acquire()
print "semaphore recived"
process = self.process_handler(url, self.log_queue, self.__sem, self.page_queue)
process.start()
Just a little context, self.log_queue in the spawning process is self.__log in the Job process, self.page_queue is self.__pqueue in the Job process, and self.__sem is the same as self.__sem in the Job process.
The spawing process usually hangs at:
url = self.page_queue.get()
I'm pretty sure that is has something to do with the Queues breaking when the Job Processes terminate before they finish writing to the queues, however that's just a hunch. and self.__pqueue.join_thread() raises an assertion error.
Upvotes: 1
Views: 560
Reputation: 6045
Ok, I got it fixed. My first hunch was that the Queues were being killed on the write. However, after checking to make sure that the queue had values in it using Queue.qsize(), I began thinking that it was the semaphore that was causing the problem. So I looked into multiprocessing Manager objects, objects that allow process to manipulate their data via "proxies". So I switched the logic so that all of the Queues and Semaphores would be controlled by Manager objects and that seems to have worked out perfectly. Link to the applicable python docs: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes
Upvotes: 0
Reputation: 45039
If you are fetching webpages as it appears, you should consider using the eventlet library instead of multiple processes. Splitting into multiple processes is really useful if you are doing a lot of computation. However, you are probably going to spend most of your time waiting for your Internet connection. As a result, the extra overhead of starting processes is wasted.
Eventlet works on a cooperative threading model and makes writing this sort of application much easier.
Upvotes: 1
Reputation: 49
Not sure if this will be useful, but if your self.page_queue is an instance of Queue (http://docs.python.org/library/queue.html), then get() is blocking by default. Have you verified that the queue isn't empty? It might just be hung waiting for an item. I remember that plagued me when I was using Queues.
Further, it wont join until for every task that you do a get() for, you have called task_done()
Upvotes: 1