Reputation: 28676
Is it somehow possible, using Scala's parallel collections to parallelize an Iterator
without evaluating it completely beforehand?
Here I am talking about parallelizing the functional transformations on an Iterator
, namely map
and flatMap
.
I think this requires evaluating some elements of the Iterator
in advance, and then computing more, once some are consumed via next
.
All I could find would require the iterator to be converted to a Iterable
or a Stream
at best. The Stream
then gets completely evaluated when I call .par
on it.
I also welcome implementation proposals if this is not readily available. Implementations should support parallel map
and flatMap
.
Upvotes: 16
Views: 7934
Reputation: 2698
I realize that this is an old question, but does the ParIterator
implementation in the iterata library do what you were looking for?
scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads
Upvotes: 6
Reputation: 111
It's a bit hard to follow exactly what you're after, but perhaps it's something like this:
val f = (x: Int) => x + 1
val s = (0 to 9).toStream map f splitAt(6) match {
case (left, right) => left.par; right
}
This will eveluate f on the first 6 elements in parallel and then return a stream over the rest.
Upvotes: 0
Reputation: 39587
From the ML, Traversing iterator elements in parallel:
https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ
I moved off Future.traverse
for a similar reason. For my use case, keeping N jobs working, I wound up with code to throttle feeding the execution context from the job queue.
My first attempt involved blocking the feeder thread, but that risked also blocking tasks which wanted to spawn tasks on the execution context. What do you know, blocking is evil.
Upvotes: 2
Reputation: 167911
Your best bet with the standard library is probably not using parallel collections but concurrent.Future.traverse
:
import concurrent._
import ExecutionContext.Implicits.global
Future.traverse(Iterator(1,2,3))(i => Future{ i*i })
though I think this will execute the whole thing starting as soon as it can.
Upvotes: 4