Samir Nagheenanajar
Samir Nagheenanajar

Reputation: 45

Killing child processes created in class __init__ in Python

(New to Python and OO - I apologize in advance if I'm being stupid here)

I'm trying to define a Python 3 class such that when an instance is created two subprocesses are also created. These subprocesses do some work in the background (sending and listening for UDP packets). The subprocesses also need to communicate with each other and with the instance (updating instance attributes based on what is received from UDP, among other things).

I am creating my subprocesses with os.fork because I don't understand how to use the subprocess module to send multiple file descriptors to child processes - maybe this is part of my problem.

The problem I am running into is how to kill the child processes when the instance is destroyed. My understanding is I shouldn't use destructors in Python because stuff should get cleaned up and garbage collected automatically by Python. In any case, the following code leaves the children running after it exits.

What is the right approach here?

import os
from time import sleep

class A:
    def __init__(self):
        sfp, pts = os.pipe() # senderFromParent, parentToSender
        pfs, stp = os.pipe() # parentFromSender, senderToParent
        pfl, ltp = os.pipe() # parentFromListener, listenerToParent
        sfl, lts = os.pipe() # senderFromListener, listenerToSender
        pid = os.fork()
        if pid:
            # parent
            os.close(sfp)
            os.close(stp)
            os.close(lts)
            os.close(ltp)
            os.close(sfl)
            self.pts = os.fdopen(pts, 'w') # allow creator of A inst to
            self.pfs = os.fdopen(pfs, 'r') # send and receive messages
            self.pfl = os.fdopen(pfl, 'r') # to/from sender and
        else:                              # listener processes
            # sender or listener
            os.close(pts)
            os.close(pfs)
            os.close(pfl)
            pid = os.fork()
            if pid:
                # sender
                os.close(ltp)
                os.close(lts)
                sender(self, sfp, stp, sfl)
            else:
                # listener
                os.close(stp)
                os.close(sfp)
                os.close(sfl)
                listener(self, ltp, lts)

def sender(a, sfp, stp, sfl):
    sfp = os.fdopen(sfp, 'r') # receive messages from parent
    stp = os.fdopen(stp, 'w') # send messages to parent
    sfl = os.fdopen(sfl, 'r') # received messages from listener
    while True:
        # send UDP packets based on messages from parent and process
        # responses from listener (some responses passed back to parent)
        print("Sender alive")
        sleep(1)

def listener(a, ltp, lts):
    ltp = os.fdopen(ltp, 'w') # send messages to parent
    lts = os.fdopen(lts, 'w') # send messages to sender
    while True:
        # listen for and process incoming UDP packets, sending some
        # to sender and some to parent
        print("Listener alive")
        sleep(1)

a = A()

Running the above produces:

Sender alive
Listener alive
Sender alive
Listener alive
...

Upvotes: 4

Views: 1518

Answers (3)

Dunes
Dunes

Reputation: 40853

There's no point trying to reinvent the wheel. subprocess does all you want and more, though multiprocessing will simply the process, so we'll use that.

You can use multiprocessing.Pipe to create connections and can send messages back and forth between a pair of processes. You can make a pipe "duplex", so both ends can send and receive if that's what you need. You can use multiprocessing.Manager to create a shared Namespace between processes (sharing a state between listener, sender and parent). There is a warning with using multiprocessing.list, multiprocessing.dict or multiprocessing.Namespace. Any mutable object assigned to them will not see changes made to that object until it is reassigned to the managed object.

eg.

namespace.attr = {}
# change below not cascaded to other processes
namespace.attr["key"] = "value"
# force change to other processes
namespace.attr = namespace.attr

If you need to have more than one process write to the same attribute then you will need to use synchronisation to prevent concurrent modification by one processes wiping out changes made by another process.

Example code:

from multiprocessing import Process, Pipe, Manager

class Reader:

    def __init__(self, writer_conn, namespace):
        self.writer_conn = writer_conn
        self.namespace = namespace

    def read(self):
        self.namespace.msgs_recv = 0
        with self.writer_conn:
            try:
                while True:
                    obj = self.writer_conn.recv()
                    self.namespace.msgs_recv += 1
                    print("Reader got:", repr(obj))
            except EOFError:
                print("Reader has no more data to receive")

class Writer:

    def __init__(self, reader_conn, namespace):
        self.reader_conn = reader_conn
        self.namespace = namespace

    def write(self, msgs):
        self.namespace.msgs_sent = 0
        with self.reader_conn:
            for msg in msgs:
                self.reader_conn.send(msg)
                self.namespace.msgs_sent += 1

def create_child_processes(reader, writer, msgs):
    p_write = Process(target=Writer.write, args=(writer, msgs))
    p_write.start()

    # This is very important otherwise reader will hang after writer has finished.
    # The order of this statement coming after p_write.start(), but after
    # p_read.start() is also important. Look up file descriptors and how they
    # are inherited by child processes on Unix and how a any valid fd to the
    # write side of a pipe will keep all read ends open
    writer.reader_conn.close()

    p_read = Process(target=Reader.read, args=(reader,))
    p_read.start()

    return p_read, p_write

def run_mp_pipe():

    manager = Manager()
    namespace = manager.Namespace()
    read_conn, write_conn = Pipe()

    reader = Reader(read_conn, namespace)
    writer = Writer(write_conn, namespace)

    p_read, p_write = create_child_processes(reader, writer, 
        msgs=["hello", "world", {"key", "value"}])

    print("starting")

    p_write.join()
    p_read.join()

    print("done")
    print(namespace)
    assert namespace.msgs_sent == namespace.msgs_recv

if __name__ == "__main__":
    run_mp_pipe()

Output:

starting
Reader got: 'hello'
Reader got: 'world'
Reader got: {'key', 'value'}
Reader has no more data to receive
done
Namespace(msgs_recv=3, msgs_sent=3)

Upvotes: 0

Brane Čibej
Brane Čibej

Reputation: 26

Actually, you should use destructors. Python objects have a __del__ method, which is called just before the object is garbage-collected.

In your case, you should define

def __del__(self):
   ...

within your class A that sends the appropriate kill signals to your child processes. Don't forget to store the child PIDs in your parent process, of course.

Upvotes: 1

Amaury Medeiros
Amaury Medeiros

Reputation: 2233

As suggested here, you can create a child process using multiprocessing module with flag daemon=True.

Example:

from multiprocessing import Process

p = Process(target=f, args=('bob',))
p.daemon = True
p.start()

Upvotes: 0

Related Questions