Reputation: 31895
Update: with the help of dano, I solved this problem.
I didn't invoke producers with join()
, it made my script hanging.
Only need to add one line as dano said:
...
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue))
producer.daemon = True
producer.start()
...
Old script:
import multiprocessing
import Queue
QUEUE_SIZE = 2000
def produce(file_queue, row_queue,):
while not file_queue.empty():
src_file = file_queue.get()
zip_reader = gzip.open(src_file, 'rb')
try:
csv_reader = csv.reader(zip_reader, delimiter=SDP_DELIMITER)
for row in csv_reader:
new_row = process_sdp_row(row)
if new_row:
row_queue.put(new_row)
finally:
zip_reader.close()
def consume(row_queue):
'''processes all rows, once queue is empty, break the infinit loop'''
while True:
try:
# takes a row from queue and process it
pass
except multiprocessing.TimeoutError as toe:
print "timeout, all rows have been processed, quit."
break
except Queue.Empty:
print "all rows have been processed, quit."
break
except Exception as e:
print "critical error"
print e
break
def main(args):
file_queue = multiprocessing.Queue()
row_queue = multiprocessing.Queue(QUEUE_SIZE)
file_queue.put(file1)
file_queue.put(file2)
file_queue.put(file3)
# starts 3 producers
for i in xrange(4):
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue))
producer.start()
# starts 1 consumer
consumer = multiprocessing.Process(target=consume,args=(row_queue,))
consumer.start()
# blocks main thread until consumer process finished
consumer.join()
# prints statistics results after consumer is done
sys.exit(0)
if __name__ == "__main__":
main(sys.argv[1:])
Purpose:
I am using python 2.7
multiprocessing
to generate 3 producers reading 3 files at the same time, and then put the file lines into a row_queue
and generate 1 consumer to do more processing about all rows. Print statistics result in main thread after consumer is done, so I use join()
method. Finally invoke sys.exit(0)
to quit the script.
Problem: Cannot quit the script.
I tried to replace sys.exit(0)
with print "the end"
, "the end" displayed on console. Am I doing something wrong? why the script does not quit, and how to let it quit? Thanks
Upvotes: 3
Views: 2126
Reputation: 5527
Your producers
do not have multiprocessing.Process.daemon
propery set:
daemon
The process’s daemon flag, a Boolean value. This must be set before start() is called.
The initial value is inherited from the creating process.
When a process exits, it attempts to terminate all of its daemonic child processes.
Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits. Additionally, these are not Unix daemons or services, they are normal processes that will be terminated (and not joined) if non-daemonic processes have exited.
https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.daemon
Just add producer.daemon = True
:
...
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue))
producer.daemon = True
producer.start()
...
That should make it possible for the whole program to end when the consumer
is joined.
By the way, you should probably join
the producers too.
Upvotes: 1