Reputation: 1557
I'm writing a machine learning program with the following components:
A shared "Experience Pool" with a binary-tree-like data structure.
N simulator processes. Each adds an "experience object" to the pool every once in a while. The pool is responsible for balancing its tree.
M learner processes that sample a batch of "experience objects" from the pool every few moments and perform whatever learning procedure.
I don't know what's the best way to implement the above. I'm not using Tensorflow, so I cannot take advantage of its parallel capability. More concretely,
multiprocessing
library. Unlike multithreading
, however, multiprocessing
module cannot have different processes update the same global object. My hunch is that I should use the server-proxy model. Could anyone please give me a rough skeleton code to start with? celery
, disque
, etc. It's not obvious to me how to adapt them to my use case.Upvotes: 0
Views: 289
Reputation: 9846
Based on the comments, what you're really looking for is a way to update a shared object from a set of processes that are carrying out a CPU-bound task. The CPU-bounding makes multiprocessing
an obvious choice - if most of your work was IO-bound, multithreading
would have been a simpler choice.
Your problem follows a simpler server-client model: the clients use the server as a simple stateful store, no communication between any child processes is needed, and no process needs to be synchronised.
Thus, the simplest way to do this is to:
From the server's perspective, the identity of the clients doesn't matter - only their actions do.
Thus, this can be accomplished by using a customised manager in multiprocessing
as so:
# server.py
from multiprocessing.managers import BaseManager
# this represents the data structure you've already implemented.
from ... import ExperienceTree
# An important note: the way proxy objects work is by shared weak reference to
# the object. If all of your workers die, it takes your proxy object with
# it. Thus, if you have an instance, the instance is garbage-collected
# once all references to it have been erased. I have chosen to sidestep
# this in my code by using class variables and objects so that instances
# are never used - you may define __init__, etc. if you so wish, but
# just be aware of what will happen to your object once all workers are gone.
class ExperiencePool(object):
tree = ExperienceTree()
@classmethod
def update(cls, experience_object):
''' Implement methods to update the tree with an experience object. '''
cls.tree.update(experience_object)
@classmethod
def sample(cls):
''' Implement methods to sample the tree's experience objects. '''
return cls.tree.sample()
# subclass base manager
class Server(BaseManager):
pass
# register the class you just created - now you can access an instance of
# ExperiencePool using Server.Shared_Experience_Pool().
Server.register('Shared_Experience_Pool', ExperiencePool)
if __name__ == '__main__':
# run the server on port 8080 of your own machine
with Server(('localhost', 8080), authkey=b'none') as server_process:
server_process.get_server().serve_forever()
Now for all of your clients you can just do:
# client.py - you can always have a separate client file for a learner and a simulator.
from multiprocessing.managers import BaseManager
from server import ExperiencePool
class Server(BaseManager):
pass
Server.register('Shared_Experience_Pool', ExperiencePool)
if __name__ == '__main__':
# run the server on port 8080 of your own machine forever.
server_process = Server(('localhost', 8080), authkey=b'none')
server_process.connect()
experience_pool = server_process.Shared_Experience_Pool()
# now do your own thing and call `experience_call.sample()` or `update` whenever you want.
You may then launch one server.py
and as many workers
as you want.
Not always. You may run into race conditions in that your learners may receive stale or old data if they are forced to compete with a simulator node writing at the same time.
If you want to ensure a preference for latest writes, you may additionally use a lock whenever your simulators are trying to write something, preventing your other processes from getting a read until the write finishes.
Upvotes: 1