slurms
slurms

Reputation: 768

Python multiprocessing exiting cleanly

I've got a daemon that runs a number of child processes intended to maintain a telnet connection to collect data from a bunch of weather stations. I've set it up so that these child processes read from that telnet connection forever, passing the weather readings back to the parent process via a multiprocessing.Queue. I can't seem to get these child processes to exit cleanly when I stop the daemon with ./test.py stop. Is there an easy way to close the child processes on exit? A quick google mentioned someone using multiprocessing.Event, what's the best way to set this event on exit to ensure the processes exit? Here's our current code:

from daemon import runner
from multiprocessing import Process, Queue
import telnetlib

from django.utils.encoding import force_text

from observations.weather.models import WeatherStation
import os

os.environ['DJANGO_SETTINGS_MODULE'] = 'settings'


def read_weather_data(name, ip_address, port, queue):
    print "Started process to get data for", name
    client = telnetlib.Telnet(ip_address, port)

    while True:
        response = client.read_until('\r\n'.encode('utf8'))
        queue.put((name, force_text(response)))

    client.close()


class App(object):
    def __init__(self):
        self.stdin_path = '/dev/null'
        self.stdout_path = '/dev/tty'
        self.stderr_path = '/dev/tty'
        self.pidfile_path = '/tmp/process_weather.pid'
        self.pidfile_timeout = 5

    def run(self):
        queue = Queue()

        for station in WeatherStation.objects.filter(active=True):
            p = Process(target=read_weather_data,
                        args=(station.name, station.ip_address, station.port,
                              queue,))
            p.start()

        while True:
            name, data = queue.get()
            print "Received data from ", name
            print data

app = App()
daemon_runner = runner.DaemonRunner(app)
daemon_runner.do_action()

Upvotes: 0

Views: 637

Answers (1)

slurms
slurms

Reputation: 768

Seem to have found a way to do this, but am unsure about whether this is the best approach to take.

from daemon import runner
from multiprocessing import Process, Queue, Event
import telnetlib

from django.utils.encoding import force_text

from observations.weather.models import WeatherStation
import os
import signal
import errno

os.environ['DJANGO_SETTINGS_MODULE'] = 'settings'


def read_weather_data(name, ip_address, port, queue, exit):
    print "Started process to get data for", name
    client = telnetlib.Telnet(ip_address, port)

    while not exit.is_set():
        response = client.read_until('\r\n'.encode('utf8'))
        queue.put((name, force_text(response)))

    print "exit called for", name
    client.close()


def exit_handler(signum, frame):
    print "exiting..."


class App(object):
    def __init__(self):
        self.stdin_path = '/dev/null'
        self.stdout_path = '/dev/tty'
        self.stderr_path = '/dev/tty'
        self.pidfile_path = '/tmp/process_weather.pid'
        self.pidfile_timeout = 5

    def run(self):
        exit = Event()

        def exit_handler(signum, frame):
            print "exiting..."
            exit.set()

        signal.signal(signal.SIGTERM, exit_handler)
        queue = Queue()

        workers = []
        for station in WeatherStation.objects.filter(active=True):
            p = Process(target=read_weather_data,
                        args=(station.name, station.ip_address, station.port,
                              queue, exit))
            workers.append(p)

        for worker in workers:
            worker.start()

        while True:
            try:
                name, data = queue.get()
            except IOError as e:
                # we received a signal whilst waiting for I/O
                if e.errno != errno.EINTR:
                    raise
                else:
                    break
            print "Received data from ", name
            print data

        for worker in workers:
            worker.join()


app = App()
daemon_runner = runner.DaemonRunner(app)
daemon_runner.do_action()

Upvotes: 2

Related Questions