Reputation: 4102
I want to use the remote manager functions in the multiprocessing
module to distribute work among many machines. I know there are 3rd party modules, but I want to stick with core as much as possible. I know for desktop (single machine), you can use the multiprocessing.Pool
class to limit the number of CPUs, but have a couple of questions with remote managers.
I have the following code for the remote manager:
from multiprocessing.managers import BaseManager
import Queue
queue = Queue.Queue()
class QueueManager(BaseManager): pass
QueueManager.register('get_queue', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey='abracadabra')
s = m.get_server()
s.serve_forever()
This works great, and I can even submit a job into the Queue using the following code:
QueueManager.register('get_queue')
m = QueueManager(address=('machinename', 50000), authkey='abracadabra')
m.connect()
queue = m.get_queue()
queue.put('hello')
You can also the queue.get()
to get a single entry in the queue.
Upvotes: 2
Views: 6113
Reputation: 101909
Iterating over a queue is the same as doing:
while True:
elem = queue.get() #queue empty -> it blocks!!!
An elegant way to "iterate" over a queue and block your worker process when there are no more jobs to execute is to use None
(or something else) as a sentinel and use iter(callable, sentinel)
:
for job in iter(queue.get, None):
# execute the calculation
output_queue.put(result)
#shutdown the worker process
Which is equivalent to:
while True:
job = queue.get()
if job is None:
break
#execute the calculation
output_queue.put(result)
#shutdown the worker process
Note that you have to insert in the queu a sentinel for each worker subprocess, otherwise there will be subprocesses waiting for it.
Regarding your second question, I don't understand what you are asking. The BaseManager
provides one server that executes the calls from the clients, so, obviously, all requests are satisfied by the same machine.
Or do you mean allow each client to do only a request? I don't see any option for this, even though it could be implemented "by hand".
I don't understand your question. What is like a pull method? Can you rephrase your question with a bit more details on what you mean by "a push method where the multiprocessing server can be triggered"?
Upvotes: 1