Reputation: 1326
I have an Iterable
of "work units" that need to be performed, in no particular order, and can easily run in parallel without interfering with one another.
Unfortunately, running too many of them at a time will exceed my available RAM, so I need to make sure that only a handful is running simultaneously at any given time.
At the most basic, I want a function of this type signature:
parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B]
such that the output Iterator
is not necessarily in the same order as the input (if I want to maintain knowledge of where the result came from, I can output a pair with the input or something.) The consumer can then consume the resulting iterator incrementally without eating up all of the machine's memory, while maintaining as much parallelism as is possible for this task.
Furthermore, I want the function to be as efficient as possible. An initial idea I had was for example to do something along the following lines:
xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator)
where I was hoping the toSet
would inform Scala's parallel collection that it could start producing elements from its iterator as soon as they were ready, in any order, and the grouped
call was to limit the number of simultaneous workers. Unfortunately, it doesn't look like the toSet
call achieves the desired effect (the results are returned in the same order as they would have been without the par
call, in my experiments,) and the grouped
call is suboptimal. For example, if we have a group size of 100, and 99 of those jobs complete immediately on a dozen cores, but one of them is particularly slow, most of the remaining cores will be idle until we can move to the next group. It would be much cleaner to have an "adaptive window" that is at most as big as my chunk size, but doesn't get held up by slow workers.
I can envision writing something like this myself with a work-stealing (de)queue or something along those lines, but I imagine that a lot of the hard work of dealing with the concurrency primitives is already done for me at some level in Scala's parallel collections library. Does anyone know what parts of it I could reuse to build this piece of functionality, or have other suggestions on how to implement such an operation?
Upvotes: 3
Views: 658
Reputation: 5069
The Parallel collections framework allows you to specify the maximum number of threads to be used for a given task. Using scala-2.10, you'd want to do:
def parMap[A,B](x : Iterable[A], f : A => B, chunkSize : Int) = {
val px = x.par
px.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(chunkSize))
px map f
}
This will prevent more than chunkSize
operations running at any one time. This uses a work-stealing strategy underneath to keep the actors working, and so doesn't suffer from the same problem as your grouped
example above.
Doing it this way won't reorder the results into first-completed order, however. For that, I'd suggest something like turning your operation into an actor and having a small actor pool running the operations and then sending results back to you as they complete.
Upvotes: 3