bwawok
bwawok

Reputation: 15377

Scala Parallel Collections- How to return early?

I have a list of possible input Values

val inputValues = List(1,2,3,4,5)

I have a really long to compute function that gives me a result

def reallyLongFunction( input: Int ) : Option[String] = { ..... }

Using scala parallel collections, I can easily do

inputValues.par.map( reallyLongFunction( _ ) )

To get what all the results are, in parallel. The problem is, I don't really want all the results, I only want the FIRST result. As soon as one of my input is a success, I want my output, and want to move on with my life. This did a lot of extra work.

So how do I get the best of both worlds? I want to

  1. Get the first result that returns something from my long function
  2. Stop all my other threads from useless work.

Edit - I solved it like a dumb java programmer by having

@volatile var done = false;

Which is set and checked inside my reallyLongFunction. This works, but does not feel very scala. Would like a better way to do this....

Upvotes: 14

Views: 1335

Answers (3)

Greg Campbell
Greg Campbell

Reputation: 15302

If you're willing to use a non-core library, I think Futures would be a good match for this task. For instance:

...both of which appear to enable the functionality you're looking for.

Upvotes: 2

Luigi Plinge
Luigi Plinge

Reputation: 51109

I took interpreted your question in the same way as huynhjl, but if you just want to search and discardNones, you could do something like this to avoid the need to repeat the computation when a suitable outcome is found:

class Computation[A,B](value: A, function: A => B) {
  lazy val result = function(value)
}

def f(x: Int) = {          // your function here
  Thread.sleep(100 - x)
  if (x > 5) Some(x * 10)
  else None
}

val list = List.range(1, 20) map (i => new Computation(i, f))  
val found = list.par find (_.result.isDefined) 
  //found is Option[Computation[Int,Option[Int]]]
val result = found map (_.result.get)
  //result is Option[Int]

However find for parallel collections seems to do a lot of unnecessary work (see this question), so this might not work well, with current versions of Scala at least.

Volatile flags are used in the parallel collections (take a look at the source for find, exists, and forall), so I think your idea is a good one. It's actually better if you can include the flag in the function itself. It kills referential transparency on your function (i.e. for certain inputs your function now sometimes returns None rather than Some), but since you're discarding the stopped computations, this shouldn't matter.

Upvotes: 3

Havoc P
Havoc P

Reputation: 8477

(Updated: no, it doesn't work, doesn't do the map)

Would it work to do something like:

inputValues.par.find({ v => reallyLongFunction(v); true })

The implementation uses this:

  protected[this] class Find[U >: T](pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) extends Accessor[Option[U], Find[U]] {
    @volatile var result: Option[U] = None
    def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort }
    protected[this] def newSubtask(p: IterableSplitter[T]) = new Find(pred, p)
    override def merge(that: Find[U]) = if (this.result == None) result = that.result
  }

which looks pretty similar in spirit to your @volatile except you don't have to look at it ;-)

Upvotes: 4

Related Questions