BugSpray
BugSpray

Reputation: 143

Pass method return values between multiprocessing.Process instances

How can I get returned values from a method in another instance of multiprocessing.Process?

I have two files:

file hwmgr.py:

import multiprocessing as mp
from setproctitle import setproctitle
import smbus
import myLoggingModule as log

class HWManager(mp.Process):
    def __init__(self):
        mp.Process.__init__(self, cmd_q, res_q)
        self.i2c_lock = mp.Lock()
        self.commandQueue = cmd_q
        self.responseQueue = res_q
    def run(self):
        setproctitle('hwmgr')
        while True:
            cmd, args = self.commandQueue.get()
            if cmd is None: self.terminate()
            method = self.__getattribute__(cmd)
            result = method(**args)
            if result is not None:
                self.responseQueue.put(result)

    def get_voltage(self):
        with self.i2c_lock:
            # ...do i2c stuff to get a voltage with smbus module
        return voltage

file main.py:

import multiprocessing as mp
import hwmgr

cmd_q = mp.Queue()
res_q = mp.Queue()

hwm = hwmgr.HWManager(cmd_q, res_q)
hwm.start()

cmd_q.put(('get_voltage', {}))
battery = res_q.get()

print battery

While this solution works, the complexity of the HWManager process is likely to grow in the future, and other processes are spawned off main.py (code is simplified) which use the same mechanism. There's obviously a chance that the wrong process will get the wrong return data from it's res_q.get() command.

What would be a more robust way of doing this?
(I'm trying to avoid having one return mp.Queue for each other process - as this would require reworking the HWManager class each time to accommodate the additional Queues)

OK - WIP code is as follows:

hwmgr.py:

import multiprocessing as mp
from multiprocessing.connection import Listener
from setproctitle import setproctitle
import smbus

class HWManager(mp.Process):
    def __init__(self):
        mp.Process.__init__(self)
        self.i2c_lock = mp.Lock()

    def run(self):
        setproctitle('hwmgr')
        self.listener = Listener('/tmp/hw_sock', 'AF_UNIX')

        with self.i2c_lock:
            pass  # Set up I2C bus to take ADC readings

        while True:
            conn = self.listener.accept()
            cmd, args = conn.recv()

            if cmd is None: self.terminate()
            method = self.__getattribute__(cmd)
            result = method(**args)
            conn.send(result)

    def get_voltage(self):
        with self.i2c_lock:
            voltage = 12.0  # Actually, do i2c stuff to get a voltage with smbus module

        return voltage

file client.py

import multiprocessing as mp
from multiprocessing.connection import Client
from setproctitle import setproctitle
from time import sleep

class HWClient(mp.Process):

def __init__(self):
    mp.Process.__init__(self)
    self.client = Client('/tmp/hw_sock', 'AF_UNIX')

def run(self):
    setproctitle('client')
    while True:
        self.client.send(('get_voltage', {}))
        battery = self.client.recv()
        print battery
        sleep(5)

main.py:

import hwmgr
import client

cl = client.HWClient()  # Put these lines here = one error (conn refused)
cl.start()
hwm = hwmgr.HWManager()
hwm.start()
# cl = client.HWClient()  # ...Or here, gives the other (in use)
# cl.start()

Upvotes: 0

Views: 924

Answers (1)

spinlok
spinlok

Reputation: 3661

This sounds like it calls for a standard client-server architecture. You can use UNIX domain sockets (or named pipes on Windows). The multiprocessing module makes it super easy to pass python objects between processes. Sample structure of server code:

from multiprocessing.connection import Listener

listener = Listener('somefile', 'AF_UNIX')

queue = Queue()
def worker():
    while True:
        conn, cmd = queue.get()
        result = execute_cmd(cmd)
        conn.send(result)
        queue.task_done()


for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()


while True:
    conn = listener.accept()
    cmd = conn.recv()
    queue.put((conn, cmd)) # Do processing of the queue in another thread/process and write result to conn

The client side would look like:

from multiprocessing.connection import Client
client = Client('somefile', 'AF_UNIX')

client.send(cmd)
result = client.recv()

The above code uses threads for workers, but you could just as easily have processes for workers using the multiprocessing module. See the docs for details.

Upvotes: 1

Related Questions