code base 5000
code base 5000

Reputation: 4102

Python, Using Remote Managers and Multiprocessing

I want to use the remote manager functions in the multiprocessing module to distribute work among many machines. I know there are 3rd party modules, but I want to stick with core as much as possible. I know for desktop (single machine), you can use the multiprocessing.Pool class to limit the number of CPUs, but have a couple of questions with remote managers.

I have the following code for the remote manager:

   from multiprocessing.managers import BaseManager
   import Queue
   queue = Queue.Queue()
   class QueueManager(BaseManager): pass
   QueueManager.register('get_queue', callable=lambda:queue)
   m = QueueManager(address=('', 50000), authkey='abracadabra')
   s = m.get_server()
   s.serve_forever()

This works great, and I can even submit a job into the Queue using the following code:

QueueManager.register('get_queue')
m = QueueManager(address=('machinename', 50000), authkey='abracadabra')
m.connect()
queue = m.get_queue()
queue.put('hello')

You can also the queue.get() to get a single entry in the queue.

  1. How do you get the items in the queue? When I tried to iterate through the queue, I enter an infinite loop.
  2. On the workers, can you limit each machine to 1 job per machine?
  3. Since this method seems to be a pull method, where the workers need to examine if a job exists, can there be a push method where the multiprocessing server can be triggered?

Upvotes: 2

Views: 6113

Answers (1)

Bakuriu
Bakuriu

Reputation: 101909

Iterating over a queue is the same as doing:

while True:
    elem = queue.get()  #queue empty -> it blocks!!!

An elegant way to "iterate" over a queue and block your worker process when there are no more jobs to execute is to use None(or something else) as a sentinel and use iter(callable, sentinel):

for job in iter(queue.get, None):
    # execute the calculation
    output_queue.put(result)

#shutdown the worker process

Which is equivalent to:

while True:
    job = queue.get()
    if job is None:
        break
    #execute the calculation
    output_queue.put(result)
#shutdown the worker process

Note that you have to insert in the queu a sentinel for each worker subprocess, otherwise there will be subprocesses waiting for it.

Regarding your second question, I don't understand what you are asking. The BaseManager provides one server that executes the calls from the clients, so, obviously, all requests are satisfied by the same machine. Or do you mean allow each client to do only a request? I don't see any option for this, even though it could be implemented "by hand".

I don't understand your question. What is like a pull method? Can you rephrase your question with a bit more details on what you mean by "a push method where the multiprocessing server can be triggered"?

Upvotes: 1

Related Questions