Reputation: 20185
I have a worker process that is fed with work and sends back the processed results to the parent process.
How can I close those two pipes properly?
What I did so far:
import multiprocessing as mp
def worker(v: str, work_in, result_in):
while True:
number_str = work_in.recv()
print('Processing', number_str)
result = number_str + v
result_in.send(result)
work_out, work_in = mp.Pipe()
result_out, result_in = mp.Pipe()
for number in range(10):
number_str = str(number)
print('Sending', number_str)
work_in.send(number_str)
# work_in.close()
worker_process = mp.Process(target=worker, args=('_processed', work_out, result_in,))
worker_process.start()
for _ in range(10):
processed_number_str = result_out.recv()
print('Received', processed_number_str)
result_out.close()
worker_process.join()
The results are as expected (I hope). But if I don't close work_in
than the program does not end, if I close it, it crashes with an EOFError
.
I also welcome any stylistic or technical advice for using pipes in Python.
Upvotes: 1
Views: 724
Reputation: 396
When not closing work_in
connection, the Process will never finish because it is in an infinite loop (while True
). Thus, it will just wait forever for any incoming message.
When you do close work_in, the while True
loop will reach EOFError
as mentioned in the documentation https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection.
So, there are different ways to handles this:
If your app is not continuous (a definite number of items to process), then you can either use try and catch block to detect the EOF or use the poll
method of connection to check if there is any items to process.
a. Using EOF and closing work_in
:
def worker(v: str, work_in, result_in):
eof = False
while not(eof):
try:
number_str = work_in.recv()
print('Processing', number_str)
result = number_str + v
result_in.send(result)
except EOFError:
print('No more data to process. Exitiing')
eof = True
except:
print("Oh we forgot this exception!")
raise Exception("Something to note!")
b. using poll
(wait for 1 second and check if there are any items, if not, exit) and not closing the connection work_in
def worker(v: str, work_in, result_in):
while work_in.poll(timeout=1):
number_str = work_in.recv()
print('Processing', number_str)
result = number_str + v
result_in.send(result)
If your app is continuous, then you might need to implement a bit more complex process that enables sending a message to the subprocess via a different queue (or the same queue with a particular message) to let it know that it should terminate and then break
the loop.
Upvotes: 1