Reputation: 4336
I've created a Graph
whcih contains a Balance
. This Balance
distributes the load over 5 Flows
. What I expected what would happen was that every instance of my Flow
would run on a seperate Thread
. However, this is not what happens.
When I'm printing the Thread
name I notice that all Flows
are being executed on the same Thread
.
The code I'm using is:
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(1 to 10)
val out = Sink.ignore
val bal = builder.add(Balance[Int](5))
val merge = builder.add(Merge[Int](5))
val f1, f2, f3, f4, f5 = Flow[Int].map(x => {
println(Thread.currentThread())
x
}).async
in ~> bal ~> f1 ~> merge ~> out
bal ~> f2 ~> merge
bal ~> f3 ~> merge
bal ~> f4 ~> merge
bal ~> f5 ~> merge
ClosedShape
})
This outputs:
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
My expectation was that the output would be something along the lines of:
Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
How can I change this code sample so that the Flows
are being executed in parallel?
Upvotes: 0
Views: 628
Reputation: 9023
The async directive does not guarantee your stages will be executed in a separated thread. As long as the stages do not overlap in time, they might run on the same thread.
For your specific case, the executed steps might be the following:
Now if you change your Balance as follows
val bal = builder.add(Balance[Int](5, waitForAllDownstreams = true))
You will be forcing 5 threads to be spawned, as the steps would be
Upvotes: 3