Tvaroh
Tvaroh

Reputation: 6763

In akka-stream how to create a unordered Source from a futures collection

I need to create an akka.stream.scaladsl.Source[T, Unit] from a collection of Future[T].

E.g., having a collection of futures returning integers,

val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)

how to create a

val source: Source[Int, Unit] = ???

from it.

I cannot use Future.sequence combinator, since then I would wait for each future to complete before getting anything from the source. I want to get results in any order as soon as any future completes.

I understand that Source is a purely functional API and it should not run anything before somehow materializing it. So, my idea is to use an Iterator (which is lazy) to create a source:

Source { () =>
  new Iterator[Future[Int]] {
    override def hasNext: Boolean = ???
    override def next(): Future[Int] = ???
  }
}

But that would be a source of futures, not of actual values. I could also block on next using Await.result(future) but I'm not sure which tread pool's thread will be blocked. Also this will call futures sequentially, while I need parallel execution.

UPDATE 2: it turned out there was a much easier way to do it (thanks to Viktor Klang):

Source(futures).mapAsync(1)(identity)

UPDATE: here is what I've got based on @sschaef answer:

def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
  def run(actor: ActorRef): Unit = {
    futures.foreach { future =>
      future.onComplete {
        case Success(value) =>
          actor ! value
        case Failure(NonFatal(t)) =>
          actor ! Status.Failure(t) // to signal error
      }
    }

    Future.sequence(futures).onSuccess { case _ =>
      actor ! Status.Success(()) // to signal stream's end
    }
  }

  Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}

// ScalaTest tests follow

import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

"futuresToSource" should "convert futures collection to akka-stream source" in {
  val f1 = Future(1)
  val f2 = Future(2)
  val f3 = Future(3)

  whenReady {
    futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
  } { results =>
    results should contain theSameElementsAs Seq(1, 2, 3)
  }
}

it should "fail on future failure" in {
  val f1 = Future(1)
  val f2 = Future(2)
  val f3 = Future.failed(new RuntimeException("future failed"))

  whenReady {
    futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
  } { t =>
    t shouldBe a [RuntimeException]
    t should have message "future failed"
  }
}

Upvotes: 10

Views: 3687

Answers (2)

Viktor Klang
Viktor Klang

Reputation: 26579

Creating a source of Futures and then "flatten" it via mapAsync:

scala> Source(List(f1,f2,fN)).mapAsync(1)(identity)
res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@3e10d804

Upvotes: 6

kiritsuku
kiritsuku

Reputation: 53348

One of the easiest ways to feed a Source is through an Actor:

import scala.concurrent.Future
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

implicit val system = ActorSystem("MySystem")

def run(actor: ActorRef): Unit = {
  import system.dispatcher
  Future { Thread.sleep(100); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(300); actor ! 3 }
}

val source = Source
  .actorRef[Int](0, OverflowStrategy.fail)
  .mapMaterializedValue(ref ⇒ run(ref))
implicit val m = ActorMaterializer()

source runForeach { int ⇒
  println(s"received: $int")
}

The Actor is created through the Source.actorRef method and made available through the mapMaterializedValue method. run simply takes the Actor and sends all the completed values to it, which can then be accessed through source. In the example above, the values are sent directly in the Future, but this can of course be done everywhere (for example in the onComplete call on the Future).

Upvotes: 5

Related Questions