Reputation: 8392
I have several Flow
s in my program, that I would like to process in parallel. After all are completed, I would like to trigger some action.
One way of doing it would be to send a message to an Actor after each completion, and when the Actor verifies that all flows are ready, then it can trigger the action.
I was wondering if there was anything within the akka-streams Scala DSL that I may be overlooking that would make it even simpler.
EDIT: Converting a Flow to a future would not work because, as the documentation mentions, the Future is completed right after the first event that happens in the stream. Running the following code:
implicit val system = ActorSystem("Sys")
val fm = FlowMaterializer(MaterializerSettings())
def main(args: Array[String]): Unit = {
val fut = Flow(1 second, {() => println("tick")}).toFuture(fm)
fut.onComplete{ _ =>
println("future completed")
}
}
Prints "tick", followed by "future completed", and then an infinite sequence of "tick"s.
Upvotes: 7
Views: 5578
Reputation: 718
Oh I see. If the flow processes multiple elements, the future will complete right after the first one.
I think you can use the flow.onComplete to complete some promises. e.g.
val promise1 = Promise[Unit]()
val promise2 = Promise[Unit]()
val flow1 = Flow(Iterator(1,2,3,4)).map(println)
val flow2 = Flow(Iterator('a,'b,'c,'d)).map(println)
flow1.onComplete(FlowMaterializer(MaterializerSettings())){
case Success(_) => promise1.success()
case Failure(e) => promise1.failure(e)
}
flow2.onComplete(FlowMaterializer(MaterializerSettings())){
case Success(_) => promise2.success()
case Failure(e) => promise2.failure(e)
}
for {
e1<- promise1.future
e2<- promise2.future
}{
println(s"completed!")
}
If on the other hand want to do something after every tuple of elements have been completed processing, you can probably use the flow1.zip(flow2)
to combine them.
Upvotes: 0
Reputation: 35443
As mentioned in the comment, I believe @Eduardo is right about converting the Flow
to a Future
. Consider this example:
implicit val system = ActorSystem("Sys")
import system.dispatcher
val text1 =
"""hello1world
foobar""".stripMargin
val text2 =
"""this1is
a1test""".stripMargin
def flowFut(text:String) = Flow(text.split("\\s").toVector)
.map(_.toUpperCase())
.map(_.replace("1", ""))
.toFuture(FlowMaterializer(MaterializerSettings()))
val fut1 = flowFut(text1)
val fut2 = flowFut(text2)
val fut3 = for{
f1 <- fut1
f2 <- fut2
} yield {
s"$f1, $f2"
}
fut3 foreach {println(_)}
Here, I run two separate transforms on each set of text lines, converting to upper and removing the #1 from any text. I then force the result of this Flow
to a Future
so I can compose the results into a new Future
which I then print out.
Upvotes: 9