Reputation: 16870
I'm trying hard to figure out how I can process a directed acyclic graph in parallel. Each node should only be able to "execute" when all its input nodes have been processed beforehand. Imagine a class Task
with the following interface:
class Task(object):
result = None
def inputs(self):
''' List all requirements of the task. '''
return ()
def run(self):
pass
I can not think of a way to process the graph that could be represented by this structure asynchronously with a maximum number of workers at the same time, except for one method.
I think the optimal processing would be achieved by creating a thread for each task, waiting for all inputs to be processed. But, spawning a thread for each task immediately instead of consecutively (i.e. when the task is ready to be processed) does not sound like a good idea to me.
import threading
class Runner(threading.Thread):
def __init__(self, task):
super(Runner, self).__init__()
self.task = task
self.start()
def run(self):
threads = [Runner(r) for r in self.task.inputs()]
[t.join() for t in threads]
self.task.run()
Is there a way to mimic this behaviour more ideally? Also, this approach does currently not implement a way to limit the number of running tasks at a time.
Upvotes: 6
Views: 3458
Reputation: 16870
Stumbling back to this question years later, I would recommend any soul crossing this road to look at the topological sort algorithm. At every step in the algorithm, you'll be looking at all the nodes in the graph that have an in-degree of zero (0). All such nodes may be processed in parallel. Limiting the number of nodes being processed in parallel this way becomes quite trivial, as you can simply decide not to push all of those nodes into a worker queue. Remove the fully processed nodes from the graph, which should leave new nodes with an in-degree zero (0), unless there's a cycle in the graph.
Upvotes: 0
Reputation: 601639
Have one master thread push items to a queue once they are ready for being processsed. Then have a pool of workers listen on the queue for tasks to work on. (Python provides a synchronized queue in the Queue
module, renamed to lower-case queue
in Python 3).
The master first creates a map from dependencies to dependent tasks. Every task that doesn't have any dependcies can go into the queue. Everytime a task is completed, the master uses the dictionary to figure out which dependent tasks there are, and puts them into the queue if all their depndencies are met now.
Upvotes: 2
Reputation: 36
Celery (http://www.celeryproject.org/) is the leading task management tool for Python. It should be able to help you with this.
Upvotes: 1