Reputation: 5023
Here is the code snippet from akka documentation
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 3.seconds)
assert(result == "123")
It is a working code snippet, However, if I use ref to tell another message like ref ! 4
, I got an exception like akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)
I guess that buffer size 3 should be enough. The reason is for fold operation is (acc, ele) => acc, so it takes accumulator and element to return new value accumulator.
So I changed the code let another actor tell wait for 3 secs. And it is working again.
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
Thread.sleep(3000)
ref ! 4
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 10.seconds)
println(result)
However, my question is that is there a way we can tell Akka stream to slow down or wait for the sink to be available. I am also using the OverflowStrategy.backpressure
, but it said Backpressure overflowStrategy not supported
.
Any other options?
Upvotes: 1
Views: 843
Reputation: 9023
You should look into Source.queue
as a way to inject elements into the stream from outside in a backpressure-aware fashion.
Source.queue
will materialize to a queue object that you can offer elements to, however when you offer them you will get back a Future
that completes when the stream is ready to accept the message.
Example below:
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (queue, future): (SourceQueueWithComplete[Int], Future[String]) =
Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
Future.sequence(Seq(
queue.offer(1),
queue.offer(2),
queue.offer(3),
queue.offer(4)
))
queue.complete()
val result = Await.result(future, 10.seconds)
println(result)
More info in the docs.
Upvotes: 6