Haifeng Zhang
Haifeng Zhang

Reputation: 31895

how to quit python script after multiprocessing processes are done?

Update: with the help of dano, I solved this problem.

I didn't invoke producers with join(), it made my script hanging. Only need to add one line as dano said:

...
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue))
producer.daemon = True
producer.start()
...

Old script:

import multiprocessing
import Queue

QUEUE_SIZE = 2000


def produce(file_queue, row_queue,):

    while not file_queue.empty():
        src_file = file_queue.get()
        zip_reader = gzip.open(src_file, 'rb')

        try:
            csv_reader = csv.reader(zip_reader, delimiter=SDP_DELIMITER)

            for row in csv_reader:
                new_row = process_sdp_row(row)
                if new_row:
                    row_queue.put(new_row)
        finally:
            zip_reader.close()


def consume(row_queue):
    '''processes all rows, once queue is empty, break the infinit loop'''
    while True:
        try:
            # takes a row from queue and process it
            pass
        except multiprocessing.TimeoutError as toe:
            print "timeout, all rows have been processed, quit."
            break
        except Queue.Empty:
            print "all rows have been processed, quit."
            break
        except Exception as e:
            print "critical error"
            print e
            break


def main(args):

    file_queue = multiprocessing.Queue()
    row_queue = multiprocessing.Queue(QUEUE_SIZE)

    file_queue.put(file1)
    file_queue.put(file2)
    file_queue.put(file3)

    # starts 3 producers
    for i in xrange(4):
        producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue))
        producer.start()

    # starts 1 consumer
    consumer = multiprocessing.Process(target=consume,args=(row_queue,))
    consumer.start()

    # blocks main thread until consumer process finished
    consumer.join()

    # prints statistics results after consumer is done

    sys.exit(0)


if __name__ == "__main__":
    main(sys.argv[1:])

Purpose:

I am using python 2.7 multiprocessing to generate 3 producers reading 3 files at the same time, and then put the file lines into a row_queue and generate 1 consumer to do more processing about all rows. Print statistics result in main thread after consumer is done, so I use join() method. Finally invoke sys.exit(0) to quit the script.

Problem: Cannot quit the script.

I tried to replace sys.exit(0) with print "the end", "the end" displayed on console. Am I doing something wrong? why the script does not quit, and how to let it quit? Thanks

Upvotes: 3

Views: 2126

Answers (1)

Jan Spurny
Jan Spurny

Reputation: 5527

Your producers do not have multiprocessing.Process.daemon propery set:

daemon

The process’s daemon flag, a Boolean value. This must be set before start() is called.

The initial value is inherited from the creating process.

When a process exits, it attempts to terminate all of its daemonic child processes.

Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits. Additionally, these are not Unix daemons or services, they are normal processes that will be terminated (and not joined) if non-daemonic processes have exited.

https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.daemon

Just add producer.daemon = True:

...
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue))
producer.daemon = True
producer.start()
...

That should make it possible for the whole program to end when the consumer is joined.

By the way, you should probably join the producers too.

Upvotes: 1

Related Questions