Eduardo
Eduardo

Reputation: 8392

Akka Streams: how to wait until several Flows are completed

I have several Flows in my program, that I would like to process in parallel. After all are completed, I would like to trigger some action.

One way of doing it would be to send a message to an Actor after each completion, and when the Actor verifies that all flows are ready, then it can trigger the action.

I was wondering if there was anything within the akka-streams Scala DSL that I may be overlooking that would make it even simpler.

EDIT: Converting a Flow to a future would not work because, as the documentation mentions, the Future is completed right after the first event that happens in the stream. Running the following code:

implicit val system = ActorSystem("Sys")
val fm = FlowMaterializer(MaterializerSettings())

def main(args: Array[String]): Unit = {
  val fut = Flow(1 second, {() => println("tick")}).toFuture(fm)

  fut.onComplete{ _ =>
    println("future completed")
  }
}

Prints "tick", followed by "future completed", and then an infinite sequence of "tick"s.

Upvotes: 7

Views: 5578

Answers (2)

Julio
Julio

Reputation: 718

Oh I see. If the flow processes multiple elements, the future will complete right after the first one.

I think you can use the flow.onComplete to complete some promises. e.g.

val promise1 = Promise[Unit]()
val promise2 = Promise[Unit]()

val flow1 = Flow(Iterator(1,2,3,4)).map(println)
val flow2 = Flow(Iterator('a,'b,'c,'d)).map(println)


flow1.onComplete(FlowMaterializer(MaterializerSettings())){
  case Success(_) =>  promise1.success()
  case Failure(e) => promise1.failure(e)
}
flow2.onComplete(FlowMaterializer(MaterializerSettings())){
  case Success(_) =>  promise2.success()
  case Failure(e) => promise2.failure(e)
}

for {
  e1<- promise1.future
  e2<- promise2.future
}{
  println(s"completed!")
}

If on the other hand want to do something after every tuple of elements have been completed processing, you can probably use the flow1.zip(flow2) to combine them.

Upvotes: 0

cmbaxter
cmbaxter

Reputation: 35443

As mentioned in the comment, I believe @Eduardo is right about converting the Flow to a Future. Consider this example:

implicit val system = ActorSystem("Sys")
import system.dispatcher

val text1 = 
  """hello1world
  foobar""".stripMargin

val text2 = 
  """this1is
  a1test""".stripMargin

def flowFut(text:String) = Flow(text.split("\\s").toVector)
  .map(_.toUpperCase())
  .map(_.replace("1", ""))
  .toFuture(FlowMaterializer(MaterializerSettings()))    


val fut1 = flowFut(text1)    
val fut2 = flowFut(text2)    
val fut3 = for{
  f1 <- fut1
  f2 <- fut2
} yield {
  s"$f1, $f2"
}

fut3 foreach {println(_)}

Here, I run two separate transforms on each set of text lines, converting to upper and removing the #1 from any text. I then force the result of this Flow to a Future so I can compose the results into a new Future which I then print out.

Upvotes: 9

Related Questions