Michael Dorner
Michael Dorner

Reputation: 20185

How to close connected multiprocessing pipes in Python properly

I have a worker process that is fed with work and sends back the processed results to the parent process.

How can I close those two pipes properly?

What I did so far:

import multiprocessing as mp

def worker(v: str, work_in, result_in):
    while True:
        number_str = work_in.recv()
        print('Processing', number_str)
        result = number_str + v
        result_in.send(result)


work_out, work_in = mp.Pipe()
result_out, result_in = mp.Pipe()

for number in range(10):
    number_str = str(number)
    print('Sending', number_str)
    work_in.send(number_str)
# work_in.close()

worker_process = mp.Process(target=worker, args=('_processed', work_out, result_in,))
worker_process.start()

for _ in range(10):
    processed_number_str = result_out.recv()
    print('Received', processed_number_str)
result_out.close()

worker_process.join()

The results are as expected (I hope). But if I don't close work_in than the program does not end, if I close it, it crashes with an EOFError.

I also welcome any stylistic or technical advice for using pipes in Python.

Upvotes: 1

Views: 724

Answers (1)

Fnaxiom
Fnaxiom

Reputation: 396

When not closing work_in connection, the Process will never finish because it is in an infinite loop (while True). Thus, it will just wait forever for any incoming message.

When you do close work_in, the while True loop will reach EOFError as mentioned in the documentation https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection.

So, there are different ways to handles this:

If your app is not continuous (a definite number of items to process), then you can either use try and catch block to detect the EOF or use the poll method of connection to check if there is any items to process.

a. Using EOF and closing work_in:

def worker(v: str, work_in, result_in):
    eof = False
    while not(eof):
        try:
            number_str = work_in.recv()
            print('Processing', number_str)
            result = number_str + v
            result_in.send(result)
        except EOFError:
            print('No more data to process. Exitiing')
            eof = True 
        except:
            print("Oh we forgot this exception!")
            raise Exception("Something to note!")

b. using poll (wait for 1 second and check if there are any items, if not, exit) and not closing the connection work_in

def worker(v: str, work_in, result_in):
    while work_in.poll(timeout=1):
        number_str = work_in.recv()
        print('Processing', number_str)
        result = number_str + v
        result_in.send(result)

If your app is continuous, then you might need to implement a bit more complex process that enables sending a message to the subprocess via a different queue (or the same queue with a particular message) to let it know that it should terminate and then break the loop.

Upvotes: 1

Related Questions