Emilis Panovas
Emilis Panovas

Reputation: 401

Akka streams: Why does Sink.head terminate stream with alsoTo broadcast?

Let's take a very simple case:

Source(1 to 10)
  .alsoTo(Sink.foreach(v => println(s"each: $v")))
  .toMat(Sink.head)(Keep.right)
  .run()

According to alsoTo documentation, I would expect Sink.foreach to print all the elements, however, it only prints first. Same happens if I switch Sink.foreach and Sink.head places.

If broadcast is implemented via GraphDSL,however , the entire source is consumed even if one of the sinks is Sink.head.

EDIT: Documentation for alsoTo states the following:

Attaches the given Sink to this Flow, meaning that elements that pass through this Flow will also be sent to the Sink.

To me this sounds like a broadcast, but maybe this is where I make the mistake. I could also interpret that toMat controls the flow. So, I would expect the following to take all elements from the source:

Source(1 to 10)
  .alsoTo(Sink.head)
  .toMat(Sink.seq)(Keep.right)
  .run()

GraphDSL version works as I would expect:

val s1 = Sink.foreach[Int](v => println(s"each: $v"))
val s2 = Sink.head[Int]
val source = Source(1 to 10)
RunnableGraph.fromGraph(GraphDSL.create(s1, s2)((_, _)) { implicit builder => (s1, s2) =>
  import GraphDSL.Implicits._
  val broadcast = builder.add(Broadcast[Int](2))
  source ~> broadcast.in
  broadcast.out(0) ~> s1
  broadcast.out(1) ~> s2
  ClosedShape
}).run()

Upvotes: 3

Views: 500

Answers (1)

Ivan Stanislavciuc
Ivan Stanislavciuc

Reputation: 7275

The reason is that Sink.head consumes single element and completes itself. This is propagated upstream in the form of a cancel and no elements are sent from the Source after this.

Code from akka.stream.impl.HeadOptionStage.onPush shows it

      def onPush(): Unit = {
        p.trySuccess(Option(grab(in)))
        completeStage()
      }

Where completeStage

Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, then marks the operator as stopped.

Update

alsoTo is a broadcast that is configured with these parameters:

      val bcast = b.add(Broadcast[Out](2, eagerCancel = true))

Your GraphDSL version works differently because by default broadcast is eagerCancel = false.

Where eagerCancel

if true, broadcast cancels upstream if any of its downstreams cancel.

Upvotes: 2

Related Questions