Michael Nelson
Michael Nelson

Reputation: 535

How to make fan-out akka-streams processing concurrent

I'm attempting to use Akka Streams to concurrently process a series of dependent streams.

Something like this:

val concurrency = 2
Source(
  (1 to 5).toStream.map(i => {
    println(s"1: Emitting $i")
    i.toString
  }))
  .mapAsyncUnordered(concurrency)(s => getNextStream(s, 25))
  .mapConcat(identity)
  .mapAsyncUnordered(concurrency)(out => getNextStream(out.x, 50))
  .mapConcat(identity)
  .mapAsyncUnordered(concurrency)(out => getNextStream(out.x, 100))
  .mapConcat(identity)
  .map(x => println(s"4: Received $x after ${System.currentTimeMillis() - start}"))
  .runWith(Sink.ignore)

My problem is it doesn't appear to be running concurrently. Changing the concurrency variable has no effect beyond 2. I suspect that mapConcat is serializing the processing but I'm not sure.

A full, runnable example of the problem can be found here: https://github.com/realrunner/akka-stream-example.

Currently, the code takes 11 seconds to complete. I could easily cut it down using raw Actors, without the benefit of properly handling backpressure. Any ideas on how to make this more concurrent?

Upvotes: 0

Views: 305

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

Each and every one of these getNextStream calls go out and use ask against one single(ton) actor. Keep in mind an actor processes incoming messages in a serialized manner.

Now when processing the message, you block the actor by using Thread.sleep. Blocking is generally discouraged within Akka - see this bit of the docs.

Depending on what is the real behaviour of your actors, you can simulate long processing in a non-blocking way using the after pattern (see docs), or if blocking is really needed - but double, triple check that - you can block on a dedicated dispatcher (as explained here).

Upvotes: 1

Related Questions