Reputation: 45
(New to Python and OO - I apologize in advance if I'm being stupid here)
I'm trying to define a Python 3 class such that when an instance is created two subprocesses are also created. These subprocesses do some work in the background (sending and listening for UDP packets). The subprocesses also need to communicate with each other and with the instance (updating instance attributes based on what is received from UDP, among other things).
I am creating my subprocesses with os.fork because I don't understand how to use the subprocess module to send multiple file descriptors to child processes - maybe this is part of my problem.
The problem I am running into is how to kill the child processes when the instance is destroyed. My understanding is I shouldn't use destructors in Python because stuff should get cleaned up and garbage collected automatically by Python. In any case, the following code leaves the children running after it exits.
What is the right approach here?
import os
from time import sleep
class A:
def __init__(self):
sfp, pts = os.pipe() # senderFromParent, parentToSender
pfs, stp = os.pipe() # parentFromSender, senderToParent
pfl, ltp = os.pipe() # parentFromListener, listenerToParent
sfl, lts = os.pipe() # senderFromListener, listenerToSender
pid = os.fork()
if pid:
# parent
os.close(sfp)
os.close(stp)
os.close(lts)
os.close(ltp)
os.close(sfl)
self.pts = os.fdopen(pts, 'w') # allow creator of A inst to
self.pfs = os.fdopen(pfs, 'r') # send and receive messages
self.pfl = os.fdopen(pfl, 'r') # to/from sender and
else: # listener processes
# sender or listener
os.close(pts)
os.close(pfs)
os.close(pfl)
pid = os.fork()
if pid:
# sender
os.close(ltp)
os.close(lts)
sender(self, sfp, stp, sfl)
else:
# listener
os.close(stp)
os.close(sfp)
os.close(sfl)
listener(self, ltp, lts)
def sender(a, sfp, stp, sfl):
sfp = os.fdopen(sfp, 'r') # receive messages from parent
stp = os.fdopen(stp, 'w') # send messages to parent
sfl = os.fdopen(sfl, 'r') # received messages from listener
while True:
# send UDP packets based on messages from parent and process
# responses from listener (some responses passed back to parent)
print("Sender alive")
sleep(1)
def listener(a, ltp, lts):
ltp = os.fdopen(ltp, 'w') # send messages to parent
lts = os.fdopen(lts, 'w') # send messages to sender
while True:
# listen for and process incoming UDP packets, sending some
# to sender and some to parent
print("Listener alive")
sleep(1)
a = A()
Running the above produces:
Sender alive
Listener alive
Sender alive
Listener alive
...
Upvotes: 4
Views: 1518
Reputation: 40853
There's no point trying to reinvent the wheel. subprocess
does all you want and more, though multiprocessing
will simply the process, so we'll use that.
You can use multiprocessing.Pipe
to create connections and can send messages back and forth between a pair of processes. You can make a pipe "duplex", so both ends can send and receive if that's what you need. You can use multiprocessing.Manager
to create a shared Namespace
between processes (sharing a state between listener, sender and parent). There is a warning with using multiprocessing.list
, multiprocessing.dict
or multiprocessing.Namespace
. Any mutable object assigned to them will not see changes made to that object until it is reassigned to the managed object.
eg.
namespace.attr = {}
# change below not cascaded to other processes
namespace.attr["key"] = "value"
# force change to other processes
namespace.attr = namespace.attr
If you need to have more than one process write to the same attribute then you will need to use synchronisation to prevent concurrent modification by one processes wiping out changes made by another process.
Example code:
from multiprocessing import Process, Pipe, Manager
class Reader:
def __init__(self, writer_conn, namespace):
self.writer_conn = writer_conn
self.namespace = namespace
def read(self):
self.namespace.msgs_recv = 0
with self.writer_conn:
try:
while True:
obj = self.writer_conn.recv()
self.namespace.msgs_recv += 1
print("Reader got:", repr(obj))
except EOFError:
print("Reader has no more data to receive")
class Writer:
def __init__(self, reader_conn, namespace):
self.reader_conn = reader_conn
self.namespace = namespace
def write(self, msgs):
self.namespace.msgs_sent = 0
with self.reader_conn:
for msg in msgs:
self.reader_conn.send(msg)
self.namespace.msgs_sent += 1
def create_child_processes(reader, writer, msgs):
p_write = Process(target=Writer.write, args=(writer, msgs))
p_write.start()
# This is very important otherwise reader will hang after writer has finished.
# The order of this statement coming after p_write.start(), but after
# p_read.start() is also important. Look up file descriptors and how they
# are inherited by child processes on Unix and how a any valid fd to the
# write side of a pipe will keep all read ends open
writer.reader_conn.close()
p_read = Process(target=Reader.read, args=(reader,))
p_read.start()
return p_read, p_write
def run_mp_pipe():
manager = Manager()
namespace = manager.Namespace()
read_conn, write_conn = Pipe()
reader = Reader(read_conn, namespace)
writer = Writer(write_conn, namespace)
p_read, p_write = create_child_processes(reader, writer,
msgs=["hello", "world", {"key", "value"}])
print("starting")
p_write.join()
p_read.join()
print("done")
print(namespace)
assert namespace.msgs_sent == namespace.msgs_recv
if __name__ == "__main__":
run_mp_pipe()
Output:
starting
Reader got: 'hello'
Reader got: 'world'
Reader got: {'key', 'value'}
Reader has no more data to receive
done
Namespace(msgs_recv=3, msgs_sent=3)
Upvotes: 0
Reputation: 26
Actually, you should use destructors. Python objects have a __del__
method, which is called just before the object is garbage-collected.
In your case, you should define
def __del__(self):
...
within your class A
that sends the appropriate kill signals to your child processes. Don't forget to store the child PIDs in your parent process, of course.
Upvotes: 1
Reputation: 2233
As suggested here, you can create a child process using multiprocessing module with flag daemon=True.
Example:
from multiprocessing import Process
p = Process(target=f, args=('bob',))
p.daemon = True
p.start()
Upvotes: 0