Reputation: 1049
I am new to interprocess communication and I am trying to understand the usage of os.pipe
and os.fork
with each other in Python.
In the code below, If I uncomment the lines "Broken Pipe" error comes otherwise it is working fine.
Idea is to have a SIGCHLD handler when child process exits and increment respective counters when child only function (run_child) and parent only function (sigchld_handler) execute. Since forked process will have its own version of memory and changes will not reflect in parent process, attempt is to let child process send message to parent process via pipe and let parent process update counter.
import os
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
self.rd , self.wr = os.pipe()
print self.rd , self.wr
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
print "Main run count : (parent) ", self.parent
#rf = os.fdopen(self.rd, 'r')
#self.child = int(rf.read())
#rf.close()
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
wr = os.fdopen(self.wr,'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
Interestingly error comes after first few iterations. Can somebody please explain why the error is coming and what should I do to solve this.
EDIT 1: There are a couple of similar examples: ex1 , ex2 , ex3 . I have actually used them only to learn but in my case, I am extending the examples to run in a loop to act more like a producer/consumer queue. I understand it might not be good approach as multiprocess/Queue modules are available in Python but I want to understand the mistake I am making here.
EDIT 2 (solution):
Based on @S.kozlov's answer, modifying code to create a new pipe for every communication. Here is the modified code.
import os
import pdb
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
os.close(self.wr)
print "Main run count : (parent) ", self.parent
rd = os.fdopen(self.rd, 'r')
self.child = int(rd.read())
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
os.close(self.rd)
wr = os.fdopen(self.wr, 'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self.rd , self.wr = os.pipe()
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
With this, output should come (something) like this.
Main run count (child) : 1
Running in child : 15752
C==> 1
Main run count : (parent) 1
Main run count (child) : 2
Running in child : 15753
C==> 2
Main run count : (parent) 2
Main run count (child) : 3
Running in child : 15754
C==> 3
Main run count : (parent) 3
Main run count (child) : 4
Running in child : 15755
C==> 4
Main run count : (parent) 4
Main run count (child) : 5
Running in child : 15756
C==> 5
Main run count : (parent) 5
Main run count (child) : 6
Running in child : 15757
C==> 6
Main run count : (parent) 6
Upvotes: 0
Views: 1149
Reputation: 848
The problem with your code is that you are trying to reuse one pipe several times, and it's not the valid case for pipe in general. The exception you are getting just saying you: "Hey, you have closed this pipe on the previous run. Once a pipe is closed, it's closed.".
So you can change your code to create a pipe for each child, store one end (read) in the "parent" and give another to the child. Then it should work.
Edit 1. I've updated your code with that thing about "one pipe for every child", it's not how the good code supposed to be, but in educational sense hope it will help.
import os
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
print "Main run count : (parent) ", self.parent
os.close(self.wr)
rf = os.fdopen(self.rd, 'r')
message = rf.read()
rf.close()
print "Code from child [", self._child_pid, "]: ", message
self.rd = None
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
os.close(self.rd)
wr = os.fdopen(self.wr, 'w')
text = "Hello from %s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
rd, wr = os.pipe()
self.rd = rd
self.wr = wr
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
Upvotes: 2