How to read only Successful values from a Seq of Futures

I am learning akka/scala and am trying to read only those Futures that succeeded from a Seq[Future[Int]] but cant get anything to work.

  1. I simulated an array of 10 Future[Int] some of which fail depending on the value FailThreshold takes (all fail for 10 and none fail for 0).
  2. I then try to read them into an ArrayBuffer (could not find a way to return immutable structure with the values).
  3. Also, there isn't a filter on Success/Failure so had to run an onComplete on each future and update buffer as a side-effect.
  4. Even when the FailThreshold=0 and the Seq has all Future set to Success, the array buffer is sometimes empty and different runs return array of different sizes.

I tried a few other suggestions from the web like using Future.sequence on the list but this throws exception if any of future variables fail.

import akka.pattern.ask
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Timeout, Failure, Success}

case object AskNameMessage
implicit val timeout = Timeout(5, SECONDS) 

val FailThreshold = 0

class HeyActor(num: Int) extends Actor {
    def receive = {
        case AskNameMessage => if (num<FailThreshold) {Thread.sleep(1000);sender ! num} else sender ! num

class FLPActor extends Actor {
    def receive = {
        case t: IndexedSeq[Future[Int]] => {
            val b = scala.collection.mutable.ArrayBuffer.empty[Int]
            t.foldLeft( b ){ case (bf,ft) => 
                ft.onComplete { case Success(v) => bf += ft.value.get.get }

val system = ActorSystem("AskTest")
val flm = (0 to 10).map( (n) => system.actorOf(Props(new HeyActor(n)), name="futureListMake"+(n)) )
val flp = system.actorOf(Props(new FLPActor), name="futureListProcessor")

// val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.failed( throw new IllegalArgumentException("DONE!") ))
val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.successful(0))
val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )
flp ! seqOfFtrs

The receive in FLPActor mostly gets

Vector(Future(Success(0)), Future(Success(1)), Future(Success(2)), Future(Success(3)), Future(Success(4)), Future(Success(5)), Future(Success(6)), Future(Success(7)), Future(Success(8)), Future(Success(9)), Future(Success(10)))

but the array buffer b has varying number of values and empty at times.

Can someone please point me to gaps here,

Allen Han
Instead of directly sending the IndexedSeq[Future[Int]], you should transform to Future[IndexedSeq[Int]] and then pipe it to the next actor. You don't send the Futures directly to an actor. You have to pipe it.

HeyActor can stay unchanged.


val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )

do a recover, and use Future.sequence to turn it into one Future:

val oneFut = Future.sequence(>{ case (ex: Throwable) => None})).map(_.flatten)

If you don't understand the business with Some, None, and flatten, then make sure you understand the Option type. One way to remove values from a sequence is to map values in the sequence to Option (either Some or None) and then to flatten the sequence. The None values are removed and the Some values are unwrapped.

After you have transformed your data into a single Future, pipe it over to FLPActor:

oneFut pipeTo flp

FLPActor should be rewritten with the following receive function:

def receive = {
  case printme: IndexedSeq[Int] => println(printme)

In Akka, modifying some state in the main thread of your actor from a Future or the onComplete of a Future is a big no-no. In the worst case, it results in race conditions. Remember that each Future runs on its own thread, so running a Future inside an actor means you have concurrent work being done in different threads. Having the Future directly modify some state in your actor while the actor is also processing some state is a recipe for disaster. In Akka, you process all changes to state directly in the primary thread of execution of the main actor. If you have some work done in a Future and need to access that work from the main thread of an actor, you pipe it to that actor. The pipeTo pattern is functional, correct, and safe for accessing the finished computation of a Future.

To answer your question about why FLPActor is not printing out the IndexedSeq correctly: you are printing out the ArrayBuffer before your Futures have been completed. onComplete isn't the right idiom to use in this case, and you should avoid it in general as it isn't good functional style.

Don't forget the import akka.pattern.pipe for the pipeTo syntax.

