Reputation: 535
I'm attempting to use Akka Streams to concurrently process a series of dependent streams.
Something like this:
val concurrency = 2
Source(
(1 to 5).toStream.map(i => {
println(s"1: Emitting $i")
i.toString
}))
.mapAsyncUnordered(concurrency)(s => getNextStream(s, 25))
.mapConcat(identity)
.mapAsyncUnordered(concurrency)(out => getNextStream(out.x, 50))
.mapConcat(identity)
.mapAsyncUnordered(concurrency)(out => getNextStream(out.x, 100))
.mapConcat(identity)
.map(x => println(s"4: Received $x after ${System.currentTimeMillis() - start}"))
.runWith(Sink.ignore)
My problem is it doesn't appear to be running concurrently. Changing the concurrency
variable has no effect beyond 2
. I suspect that mapConcat
is serializing the processing but I'm not sure.
A full, runnable example of the problem can be found here: https://github.com/realrunner/akka-stream-example.
Currently, the code takes 11 seconds to complete. I could easily cut it down using raw Actors, without the benefit of properly handling backpressure. Any ideas on how to make this more concurrent?
Upvotes: 0
Views: 305
Reputation: 9023
Each and every one of these getNextStream
calls go out and use ask
against one single(ton) actor. Keep in mind an actor processes incoming messages in a serialized manner.
Now when processing the message, you block the actor by using Thread.sleep
. Blocking is generally discouraged within Akka - see this bit of the docs.
Depending on what is the real behaviour of your actors, you can simulate long processing in a non-blocking way using the after
pattern (see docs), or if blocking is really needed - but double, triple check that - you can block on a dedicated dispatcher (as explained here).
Upvotes: 1