someday
someday

Reputation: 29

How to control the maximum concurrently running processes?

There are 5 files: main.py, worker.py, cat.py, dog.py and rabbit.py. cat, dog and rabbit inherit form worker and implement worker_run().

In the main.py, I prepare 3 processes to execute, however don't know how to control the maximum concurrently running process at the same time (eg. 2 processes).

I have tried using the multiprocessing.Pool, but it only supports functions outside class (?).

main.py:

from multiprocessing import Process
from cat import *
from dog import *
from rabbit import *

p1 = cat()
p2 = dog()
p3 = rabbit()
p1 = start()
p2 = start()
p3 = start()
p1 = join()
p2 = join()
p3 = join()

worker.py:

import multiprocessing

class Worker(multiprocessing.Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        print "Init"
        self.value = None

    def run(self):
        print "Running"
        self.worker_run()

    @abc.abstractmethod
    def worker_run(self):
    """ implement """
    return

cat.py:

from worker import *

class cat(Worker):
    def worker_run(self)
        for i in range(10000)
            print "cat run"

dog.py:

from worker import *

class dog(Worker):
    def worker_run(self)
        for i in range(10000)
            print "dog run"

rabbit.py:

from worker import *

class dog(Worker):
    def worker_run(self)
        for i in range(10000)
            print "rabbit run"

Upvotes: 0

Views: 103

Answers (1)

Bakuriu
Bakuriu

Reputation: 102039

If you want to let at most two methods run concurrently and block the third one until one of the others stopped, you have to use a Semaphore

You must pass the semaphore to the object methods so that they can acquire it. In your main file you create the semaphore and pass it to the objects:

from multiprocessing import Process, Semaphore
from cat import *
from dog import *
from rabbit import *

semaphore = Semaphore(2)   # at most 2 processes running concurrently
p1 = cat(semaphore)
p2 = dog(semaphore)
p3 = rabbit(semaphore)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()

you can then modify the Worker class to acquire the semaphore before running worker_run:

class Worker(multiprocessing.Process):
    def __init__(self, semaphore):
        multiprocessing.Process.__init__(self)
        print "Init"
        self.value = None
        self.semaphore

    def run(self):
        with self.semaphore:
            print "Running"
            self.worker_run()

    @abc.abstractmethod
    def worker_run(self):
    """ implement """
    return

This should ensure that at most 2 worker_run methods are running concurrently.


In fact I believe you are making things more complex than what ought to be. You do not have to subclass Process. You can achieve exactly the same functionality using the target argument:

from multiprocessing import Process, Semaphore
from cat import Cat
from dog import Dog
from rabbit import Rabbit

semaphore = Semaphore(2)

cat = Cat()
dog = Dog()
rabbit = Rabbit()

def run(animal, sema):
    with sema:
        animal.worker_run(*args)

cat_proc = Process(target=run, args=(cat, semaphore))
dog_proc = Process(target=run, args=(dog, semaphore))
rabbit_proc = Process(target=run, args=(rabbit, semaphore))

cat_proc.start()
dog_proc.start()
rabbit_proc.start()

cat_proc.join()
dog_proc.join()
rabbit_proc.join()

In fact with a little change you can get rid of the Semaphore and simply use the Pool object:

from multiprocessing import Pool
from cat import Cat
from dog import Dog
from rabbit import Rabbit


cat = Cat()
dog = Dog()
rabbit = Rabbit()

def run(animal):
    animal.worker_run()


pool = Pool(2)
pool.map(run, [cat, dog, rabbit])

The problem you had is that you cannot pass as target argument, or as callable to Pool.map a method, because methods cannot be pickled (see What can be pickled and unpickled?). The multiprocessing modules uses the pickle protocol to communicate between processes so everything it handles should be pickleable.

In particular to solve the problem about unpickleable methods the standard workaround is to use a global function where you explicitly pass the instance as first argument, as I did above. This is exactly what happens with method calls, but it's done automatically by the interpreter. In this case you have to handle it explicitly.

Upvotes: 2

Related Questions