Xiang Zhang
Xiang Zhang

Reputation: 2973

Does Akka stream sink as publisher handles back pressure for each subscriber?

I run one stream as publisher with fanout set to true.

Now if I start multiple streams start from this publisher, and they have different processing rate, will akka handles each subscriber back pressure separately?

I coded a test:

  val publisher: Publisher[Int] = Source.fromIterator{ () => Stream.from(1).iterator}.throttle(1, 1.second).runWith(Sink.asPublisher(true))
  // start the publisher
  Source.fromPublisher(publisher).runWith(Sink.ignore)

  Thread.sleep(2000)
  Source.fromPublisher(publisher).throttle(1, 2.seconds).runForeach(x => Logger.info(s"AAAAA: $x"))
  Source.fromPublisher(publisher).throttle(1, 4.seconds).runForeach(x => Logger.info(s"BBBBB: $x"))
  Source.fromPublisher(publisher).throttle(1, 6.seconds).runForeach(x => Logger.info(s"CCCCC: $x"))

Turns out the answer is true:

[info] application - CCCCC: 3
[info] application - BBBBB: 3
[info] application - AAAAA: 3
[info] application - AAAAA: 4
[info] application - AAAAA: 5
[info] application - BBBBB: 4
[info] application - AAAAA: 6
[info] application - CCCCC: 4
[info] application - AAAAA: 7
[info] application - BBBBB: 5
[info] application - AAAAA: 8
[info] application - CCCCC: 5
[info] application - BBBBB: 6
[info] application - AAAAA: 9
[info] application - AAAAA: 10
[info] application - BBBBB: 7
[info] application - AAAAA: 11
[info] application - AAAAA: 12

looks like the publisher provides one buffer for each subscriber and handles their back pressure separately. Am I right?

Upvotes: 0

Views: 238

Answers (1)

Ivan Stanislavciuc
Ivan Stanislavciuc

Reputation: 7275

I don't think this is the case. While I'm not able to explain it, the modified version of the app with a stream that suppose to be of a high rate does not produce desired fast rate.

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.stream.scaladsl.{Sink, Source}
import org.reactivestreams.Publisher

import concurrent.duration._

object StreamsPublisher extends App {

  implicit private val actorSystem = ActorSystem()
  private val value = ActorMaterializerSettings(actorSystem).withSupervisionStrategy{ex =>
    ex.printStackTrace()
    Supervision.Resume
  }
  private val materializer = ActorMaterializer(value)
  implicit private val mater = materializer


  val publisher: Publisher[Int] = Source
    .fromIterator { () =>
      println("new iterator created")
      Stream.from(1).iterator
    }
    .runWith(Sink.asPublisher(true))

  Source.fromPublisher(publisher).runWith(Sink.ignore)

  Thread.sleep(2000)
  Source
    .fromPublisher(publisher)
    .throttle(1, 2.seconds)
    .runForeach(x => println(s"AAAAA: $x"))
  Thread.sleep(3000)
  Source
    .fromPublisher(publisher)
    .throttle(1, 4.seconds)
    .runForeach(x => println (s"BBBBB: $x"))
  Thread.sleep(5000)
  Source
    .fromPublisher(publisher)
    .throttle(1, 6.seconds)
    .runForeach(x => println(s"CCCCC: $x"))
  Source
    .fromPublisher(publisher)
    .runForeach(x => println(s"DDDDDD: $x"))

}

The "fast" stream "DDDDD" does not run that fast.

Output is

new iterator created
AAAAA: 440617
AAAAA: 440618
BBBBB: 440633
AAAAA: 440619
AAAAA: 440620
BBBBB: 440634
CCCCC: 440633
DDDDDD: 440633
DDDDDD: 440634
DDDDDD: 440635
DDDDDD: 440636
DDDDDD: 440637
DDDDDD: 440638
DDDDDD: 440639
DDDDDD: 440640
DDDDDD: 440641
DDDDDD: 440642
DDDDDD: 440643
DDDDDD: 440644
DDDDDD: 440645
DDDDDD: 440646
DDDDDD: 440647
DDDDDD: 440648
AAAAA: 440621
AAAAA: 440622
BBBBB: 440635
AAAAA: 440623
DDDDDD: 440649
DDDDDD: 440650
DDDDDD: 440651
DDDDDD: 440652
DDDDDD: 440653
DDDDDD: 440654
DDDDDD: 440655
DDDDDD: 440656
AAAAA: 440624
CCCCC: 440634
BBBBB: 440636
AAAAA: 440625
AAAAA: 440626
BBBBB: 440637
AAAAA: 440627
CCCCC: 440635
AAAAA: 440628
BBBBB: 440638
AAAAA: 440629
AAAAA: 440630
CCCCC: 440636
BBBBB: 440639
AAAAA: 440631
DDDDDD: 440657
DDDDDD: 440658
DDDDDD: 440659
DDDDDD: 440660
DDDDDD: 440661
DDDDDD: 440662
DDDDDD: 440663
DDDDDD: 440664
AAAAA: 440632
BBBBB: 440640
AAAAA: 440633
CCCCC: 440637

Note that the "ignore" stream was fast and iterated to 440K before throttled streams joined.

It looks that there is buffer of small size which makes throttle dependent.

Upvotes: 0

Related Questions