Calardan
Calardan

Reputation: 137

Akka Streams ActorRefSource order of messages

I want to build a Sequence of items by using akka Streams' ActorRefSource. Said source is fed continuously with data. After computation has completed, the Stream gets terminated with a Poison Pill.

The following simplified example shows my intention:

val source = Source.actorRef[Int](1000, OverflowStrategy.fail)
    .mapMaterializedValue{ ref =>
      for(i <- 1 to 1000) {
        ref ! i
      }

      ref ! PoisonPill
    }

    source.runWith(Sink.seq).foreach(s => println("count: "+s.size))

I was expecting the Stream to process all 1000 elements and then terminate due to the Poison Pill being received. Unfortunately, the Stream usually terminates much earlier. Example outputs are:

count: 24

Waiting some time before sending the Poison Pill, e.g. 1000 ms results in all numbers being processed.

Any idea on how to make sure all items have been processed before the Poison Pill has been received would be highly appreciated.

Upvotes: 2

Views: 249

Answers (1)

Roland Kuhn
Roland Kuhn

Reputation: 15472

See the documentation for Source.actorRef: PoisonPill does not flush the buffer before terminating the stream.

Upvotes: 2

Related Questions