Reputation: 24518
I'm cutting my teeth on Akka streams and did a fibonacci publisher-subscriber example as follows. However, I don't quite understand yet how the demand is initially generated and what relation it has with the subscriber's request strategy. Can someone please explain?
FibonacciPublisher:
class FibonacciPublisher extends ActorPublisher[Long] with ActorLogging {
private val queue = Queue[Long](0, 1)
def receive = {
case Request(_) => // _ is the demand
log.debug("Received request; demand = {}.", totalDemand)
publish
case Cancel =>
log.info("Stopping.")
context.stop(self)
case unknown => log.warning("Received unknown event: {}.", unknown)
}
final def publish = {
while (isActive && totalDemand > 0) {
val next = queue.head
queue += (queue.dequeue + queue.head)
log.debug("Producing fibonacci number: {}.", next)
onNext(next)
if (next > 5000) self ! Cancel
}
}
}
FibonacciSubscriber:
class FibonacciSubscriber extends ActorSubscriber with ActorLogging {
val requestStrategy = WatermarkRequestStrategy(20)
def receive = {
case OnNext(fib: Long) =>
log.debug("Received Fibonacci number: {}", fib)
if (fib > 5000) self ! OnComplete
case OnError(ex: Exception) =>
log.error(ex, ex.getMessage)
self ! OnComplete
case OnComplete =>
log.info("Fibonacci stream completed.")
context.stop(self)
case unknown => log.warning("Received unknown event: {}.", unknown)
}
}
Fibonacci App:
val src = Source.actorPublisher(Props[FibonacciPublisher])
val flow = Flow[Long].map { _ * 2 }
val sink = Sink.actorSubscriber(Props[FibonacciSubscriber])
src.via(flow).runWith(sink)
Sample run: Question: Where did the initial demand for 4 come from?
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 4.
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 0.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 2.
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 0
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 2.
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 4
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 3.
2015-10-03 23:10:49.125 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 5.
Upvotes: 3
Views: 1380
Reputation: 6237
The initial demand to your source is provided by the input buffer of your later stages. This, in turn, is configured via the ActorMaterializerSettings
instance that you pass when you initialise your ActorMaterializer
.
If you don't pass any specific setting akka will use provided configuration to initialise one; in the default configuration you can find that akka.stream.materializer.initial-input-buffer-size
is set to 4. Changing that should change your initial demand.
Upvotes: 1