ziggystar
ziggystar

Reputation: 28676

Parallel iterator in Scala

Is it somehow possible, using Scala's parallel collections to parallelize an Iterator without evaluating it completely beforehand?

Here I am talking about parallelizing the functional transformations on an Iterator, namely map and flatMap. I think this requires evaluating some elements of the Iterator in advance, and then computing more, once some are consumed via next.

All I could find would require the iterator to be converted to a Iterable or a Stream at best. The Stream then gets completely evaluated when I call .par on it.

I also welcome implementation proposals if this is not readily available. Implementations should support parallel map and flatMap.

Upvotes: 16

Views: 7934

Answers (4)

ms-tg
ms-tg

Reputation: 2698

I realize that this is an old question, but does the ParIterator implementation in the iterata library do what you were looking for?

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads

Upvotes: 6

warpedjavaguy
warpedjavaguy

Reputation: 111

It's a bit hard to follow exactly what you're after, but perhaps it's something like this:

val f = (x: Int) => x + 1
val s = (0 to 9).toStream map f splitAt(6) match { 
  case (left, right) => left.par; right 
}

This will eveluate f on the first 6 elements in parallel and then return a stream over the rest.

Upvotes: 0

som-snytt
som-snytt

Reputation: 39587

From the ML, Traversing iterator elements in parallel:

https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ

I moved off Future.traverse for a similar reason. For my use case, keeping N jobs working, I wound up with code to throttle feeding the execution context from the job queue.

My first attempt involved blocking the feeder thread, but that risked also blocking tasks which wanted to spawn tasks on the execution context. What do you know, blocking is evil.

Upvotes: 2

Rex Kerr
Rex Kerr

Reputation: 167911

Your best bet with the standard library is probably not using parallel collections but concurrent.Future.traverse:

import concurrent._
import ExecutionContext.Implicits.global
Future.traverse(Iterator(1,2,3))(i => Future{ i*i })

though I think this will execute the whole thing starting as soon as it can.

Upvotes: 4

Related Questions