Reputation: 46
To take advantage of several CPU cores in a Python program, I am using the multiprocessing
module and sending data via its Pipe
class. But when the main program closes the sending end, the child processes are blocking on recv()
instead of raising an EOFError
exception. This is caused by open file descriptors, which need to be closed in the other process context first, as described in these (and other) answers:
Why doesn't pipe.close() cause EOFError during pipe.recv() in python multiprocessing?
Python multiprocessing pipe recv() doc unclear or did I miss anything?
My problem is that when consecutively creating two Processes with Pipes, the second one inherits the remaining, "parent" end file descriptor of the first one's Pipe. So closing the first Pipe will lead to hanging instead of EOFError
again, even though each Pipe's unused ends were closed as recommended.
This code illustrates the problem, Linux only:
import os
import time
import multiprocessing as mp
import subprocess
class MeasurementWriter:
def __init__(self, name):
self.name = name
self.parent_conn = None
self.worker = None
def open(self):
conn_pair = mp.Pipe()
self.worker = mp.Process(target=self.run, name=self.name, args=(conn_pair,))
self.worker.start()
self.parent_conn, child_conn = conn_pair
print('pid %d started %d; fds: %d %d'
% (os.getpid(), self.worker.pid,
self.parent_conn.fileno(), child_conn.fileno()))
# Close the other end, as it is not needed in our process context
child_conn.close()
subprocess.call(["ls", "-l", "/proc/%d/fd" % os.getpid()])
def close(self):
if self.parent_conn is None:
print('not open')
return
print('closing pipe', self.parent_conn.fileno())
self.parent_conn.close()
print('joining worker')
self.worker.join() # HANGS if more than one mp.Process has been started!
def run(self, conn_pair):
parent_conn, conn = conn_pair
print('%s pid %d started; fds: %d %d'
% (self.name, os.getpid(), parent_conn.fileno(), conn.fileno()))
# Close the other end, as it is not needed in our process context
parent_conn.close()
time.sleep(0.5)
print(self.name, 'parent_conn.closed =', parent_conn.closed)
subprocess.call(["ls", "-l", "/proc/%d/fd" % os.getpid()])
try:
print(self.name, 'recv blocking...')
data = conn.recv()
print(self.name, 'recv', data)
except EOFError:
print(self.name, 'EOF')
conn.close()
if __name__ == '__main__':
a = MeasurementWriter('A')
a.open()
# Increase fd numbers to make them recognizable
n = open('/dev/null')
z = open('/dev/zero')
# Wait for debug printing to complete
time.sleep(1)
b = MeasurementWriter('B')
b.open() # Uncomment to see clean exit
time.sleep(2)
# Clean up
a.close() # HANGS: The parent_conn fd is still open in the second Process
b.close()
The output is as follows (some uninteresting fd lines omitted). Tested with Python 3.5 and 3.8.10 under Linux:
pid 592770 started 592771; fds: 3 4
A pid 592771 started; fds: 3 4
total 0
lrwx------ 1 acolomb acolomb 64 Mar 18 19:02 3 -> 'socket:[8294651]'
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 5 -> 'pipe:[8294653]'
A parent_conn.closed = True
total 0
lrwx------ 1 acolomb acolomb 64 Mar 18 19:02 4 -> 'socket:[8294652]'
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 5 -> /dev/null
l-wx------ 1 acolomb acolomb 64 Mar 18 19:02 6 -> 'pipe:[8294653]'
A recv blocking...
pid 592770 started 592774; fds: 7 8
B pid 592774 started; fds: 7 8
total 0
lrwx------ 1 acolomb acolomb 64 Mar 18 19:02 3 -> 'socket:[8294651]'
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 4 -> /dev/null
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 5 -> 'pipe:[8294653]'
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 6 -> /dev/zero
lrwx------ 1 acolomb acolomb 64 Mar 18 19:02 7 -> 'socket:[8294672]'
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 9 -> 'pipe:[8294674]'
B parent_conn.closed = True
total 0
l-wx------ 1 acolomb acolomb 64 Mar 18 19:02 10 -> 'pipe:[8294674]'
lrwx------ 1 acolomb acolomb 64 Mar 18 19:02 3 -> 'socket:[8294651]'
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 4 -> /dev/null
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 5 -> 'pipe:[8294653]'
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 6 -> /dev/zero
lrwx------ 1 acolomb acolomb 64 Mar 18 19:02 8 -> 'socket:[8294673]'
lr-x------ 1 acolomb acolomb 64 Mar 18 19:02 9 -> /dev/null
B recv blocking...
closing pipe 3
joining worker
We can see that the youngest process (B) has inherited fd number 3 that belongs to A's Pipe on the parent end. Therefore closing it will not lead to terminating A's process, as it is still referenced. How can I avoid subsequent child processes inheriting the file descriptors of another child's Pipe
objects?
For this simple example, switching the order of the .close()
calls would probably help, but in reality they may be started in random order based on user interactions. The intended use is to write several output streams (one MeasurementWriter
instance for each) with transparent compression being handled in an associated child process, to not block the main process regularly.
One suggestion I found at https://microeducate.tech/using-python-multiprocessing-pipes/ keeps track of all pipe ends in the parent process using a list, then closing all unrelated ones in newly created child processes. But I have no good place for such a "manager", as these objects come and go during the app lifetime.
Upvotes: 1
Views: 985
Reputation: 44128
In a real-life situation one process would probably be in a loop doing recv
calls on its connection. Since we see that getting an EOFError
exception is undependable when the connection is closed on the other end, the simplest solution is for the sending end to signal "end of file" by issuing a send
call on the connection with a special sentinel item that cannot be mistaken for a normal data item. None
is often is suitable for that purpose.
So modify method method close
to be:
def close(self):
if self.parent_conn is None:
print('not open')
return
print('closing pipe', self.parent_conn.fileno())
self.parent_conn.send(None) # Sentinel
self.parent_conn.close()
print('joining worker')
self.worker.join() # HANGS if more than one mp.Process has been started!
And a more realistic run
method might be:
def run(self, conn_pair):
parent_conn, conn = conn_pair
print('%s pid %d started; fds: %d %d'
% (self.name, os.getpid(), parent_conn.fileno(), conn.fileno()))
# Close the other end, as it is not needed in our process context
parent_conn.close()
time.sleep(0.5)
print(self.name, 'parent_conn.closed =', parent_conn.closed)
subprocess.call(["ls", "-l", "/proc/%d/fd" % os.getpid()])
try:
while True:
print(self.name, 'recv blocking...')
data = conn.recv()
if data is None: # Sentinel?
break
print(self.name, 'recv', data)
except EOFError:
print(self.name, 'EOF')
conn.close()
Upvotes: 2