Xiaohe Dong
Xiaohe Dong

Reputation: 5023

Sink fold for akka stream Source.actorRef buffer and OverflowStrategy

Here is the code snippet from akka documentation

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
  .toMat(sinkUnderTest)(Keep.both).run()

ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")

val result = Await.result(future, 3.seconds)
assert(result == "123")

It is a working code snippet, However, if I use ref to tell another message like ref ! 4 , I got an exception like akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)

I guess that buffer size 3 should be enough. The reason is for fold operation is (acc, ele) => acc, so it takes accumulator and element to return new value accumulator.

So I changed the code let another actor tell wait for 3 secs. And it is working again.

  val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

  private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()

  ref ! 1
  ref ! 2
  ref ! 3
  Thread.sleep(3000)
  ref ! 4
  ref ! akka.actor.Status.Success("done")

  val result = Await.result(future, 10.seconds)

  println(result)

However, my question is that is there a way we can tell Akka stream to slow down or wait for the sink to be available. I am also using the OverflowStrategy.backpressure, but it said Backpressure overflowStrategy not supported.

Any other options?

Upvotes: 1

Views: 843

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

You should look into Source.queue as a way to inject elements into the stream from outside in a backpressure-aware fashion.

Source.queue will materialize to a queue object that you can offer elements to, however when you offer them you will get back a Future that completes when the stream is ready to accept the message.

Example below:

  val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

  val (queue, future): (SourceQueueWithComplete[Int], Future[String]) =
    Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()

  Future.sequence(Seq(
    queue.offer(1),
    queue.offer(2),
    queue.offer(3),
    queue.offer(4)
  ))

  queue.complete()

  val result = Await.result(future, 10.seconds)

  println(result)

More info in the docs.

Upvotes: 6

Related Questions