Reputation: 29
There are 5 files: main.py
, worker.py
, cat.py
, dog.py
and rabbit.py
. cat
, dog
and rabbit
inherit form worker
and implement worker_run()
.
In the main.py
, I prepare 3 processes to execute, however don't know how to control the maximum concurrently running process at the same time (eg. 2 processes).
I have tried using the multiprocessing.Pool
, but it only supports functions outside class (?).
main.py
:
from multiprocessing import Process
from cat import *
from dog import *
from rabbit import *
p1 = cat()
p2 = dog()
p3 = rabbit()
p1 = start()
p2 = start()
p3 = start()
p1 = join()
p2 = join()
p3 = join()
worker.py
:
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
print "Init"
self.value = None
def run(self):
print "Running"
self.worker_run()
@abc.abstractmethod
def worker_run(self):
""" implement """
return
cat.py
:
from worker import *
class cat(Worker):
def worker_run(self)
for i in range(10000)
print "cat run"
dog.py
:
from worker import *
class dog(Worker):
def worker_run(self)
for i in range(10000)
print "dog run"
rabbit.py
:
from worker import *
class dog(Worker):
def worker_run(self)
for i in range(10000)
print "rabbit run"
Upvotes: 0
Views: 103
Reputation: 102039
If you want to let at most two methods run concurrently and block the third one until one of the others stopped, you have to use a Semaphore
You must pass the semaphore to the object methods so that they can acquire it. In your main file you create the semaphore and pass it to the objects:
from multiprocessing import Process, Semaphore
from cat import *
from dog import *
from rabbit import *
semaphore = Semaphore(2) # at most 2 processes running concurrently
p1 = cat(semaphore)
p2 = dog(semaphore)
p3 = rabbit(semaphore)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
you can then modify the Worker
class to acquire the semaphore before running worker_run
:
class Worker(multiprocessing.Process):
def __init__(self, semaphore):
multiprocessing.Process.__init__(self)
print "Init"
self.value = None
self.semaphore
def run(self):
with self.semaphore:
print "Running"
self.worker_run()
@abc.abstractmethod
def worker_run(self):
""" implement """
return
This should ensure that at most 2 worker_run
methods are running concurrently.
In fact I believe you are making things more complex than what ought to be. You do not have to subclass Process
. You can achieve exactly the same functionality using the target
argument:
from multiprocessing import Process, Semaphore
from cat import Cat
from dog import Dog
from rabbit import Rabbit
semaphore = Semaphore(2)
cat = Cat()
dog = Dog()
rabbit = Rabbit()
def run(animal, sema):
with sema:
animal.worker_run(*args)
cat_proc = Process(target=run, args=(cat, semaphore))
dog_proc = Process(target=run, args=(dog, semaphore))
rabbit_proc = Process(target=run, args=(rabbit, semaphore))
cat_proc.start()
dog_proc.start()
rabbit_proc.start()
cat_proc.join()
dog_proc.join()
rabbit_proc.join()
In fact with a little change you can get rid of the Semaphore
and simply use the Pool
object:
from multiprocessing import Pool
from cat import Cat
from dog import Dog
from rabbit import Rabbit
cat = Cat()
dog = Dog()
rabbit = Rabbit()
def run(animal):
animal.worker_run()
pool = Pool(2)
pool.map(run, [cat, dog, rabbit])
The problem you had is that you cannot pass as target
argument, or as callable to Pool.map
a method, because methods cannot be pickled (see What can be pickled and unpickled?). The multiprocessing
modules uses the pickle
protocol to communicate between processes so everything it handles should be pickleable.
In particular to solve the problem about unpickleable methods the standard workaround is to use a global function where you explicitly pass the instance as first argument, as I did above. This is exactly what happens with method calls, but it's done automatically by the interpreter. In this case you have to handle it explicitly.
Upvotes: 2