Reputation: 401
Let's take a very simple case:
Source(1 to 10)
.alsoTo(Sink.foreach(v => println(s"each: $v")))
.toMat(Sink.head)(Keep.right)
.run()
According to alsoTo
documentation, I would expect Sink.foreach
to print all the elements, however, it only prints first. Same happens if I switch Sink.foreach
and Sink.head
places.
If broadcast is implemented via GraphDSL
,however , the entire source is consumed even if one of the sinks is Sink.head
.
EDIT:
Documentation for alsoTo
states the following:
Attaches the given Sink to this Flow, meaning that elements that pass through this Flow will also be sent to the Sink.
To me this sounds like a broadcast, but maybe this is where I make the mistake. I could also interpret that toMat
controls the flow. So, I would expect the following to take all elements from the source:
Source(1 to 10)
.alsoTo(Sink.head)
.toMat(Sink.seq)(Keep.right)
.run()
GraphDSL version works as I would expect:
val s1 = Sink.foreach[Int](v => println(s"each: $v"))
val s2 = Sink.head[Int]
val source = Source(1 to 10)
RunnableGraph.fromGraph(GraphDSL.create(s1, s2)((_, _)) { implicit builder => (s1, s2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
source ~> broadcast.in
broadcast.out(0) ~> s1
broadcast.out(1) ~> s2
ClosedShape
}).run()
Upvotes: 3
Views: 500
Reputation: 7275
The reason is that Sink.head
consumes single element and completes itself. This is propagated upstream in the form of a cancel
and no elements are sent from the Source after this.
Code from akka.stream.impl.HeadOptionStage.onPush
shows it
def onPush(): Unit = {
p.trySuccess(Option(grab(in)))
completeStage()
}
Where completeStage
Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, then marks the operator as stopped.
Update
alsoTo
is a broadcast that is configured with these parameters:
val bcast = b.add(Broadcast[Out](2, eagerCancel = true))
Your GraphDSL
version works differently because by default broadcast is eagerCancel = false
.
Where eagerCancel
if true, broadcast cancels upstream if any of its downstreams cancel.
Upvotes: 2