Loic
Loic

Reputation: 3370

Consume a source with two sinks and get the result of one sink

I'd like to consume a Source with two different sinks.

Simplified example:

val source = Source(1 to 20)

val addSink = Sink.fold[Int, Int](0)(_ + _)
val subtractSink = Sink.fold[Int, Int](0)(_ - _)

val graph = GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val bcast = builder.add(Broadcast[Int](2))

  source ~> bcast.in

  bcast.out(0) ~> addSink
  bcast.out(1) ~> subtrackSink

  ClosedShape
}

RunnableGraph.fromGraph(graph).run()

val result: Future[Int] = ???

I need to be able to retrieve the result of addSink. RunnableGraph.fromGraph(graph).run() gives me NotUsed, but I'd like to get an Int (the result of the first fold Sink). Is it possible?

Upvotes: 2

Views: 132

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19497

Pass in both sinks to the graph builder's create method, which gives you access to their respective materialized values:

val graph = GraphDSL.create(addSink, subtractSink)((_, _)) { implicit builder =>
  (aSink, sSink) =>
  import GraphDSL.Implicits._

  val bcast = builder.add(Broadcast[Int](2))

  source ~> bcast.in
  bcast.out(0) ~> aSink
  bcast.out(1) ~> sSink
  ClosedShape
}

val (addResult, subtractResult): (Future[Int], Future[Int]) =
  RunnableGraph.fromGraph(graph).run() 

Alternatively, you can forgo the graph DSL and use alsoToMat:

val result: Future[Int] =
  Source(1 to 20)
    .alsoToMat(addSink)(Keep.right)
    .toMat(subtractSink)(Keep.left)
    .run()

The above gives you the materialized value of addSink. If you want to get the materialized value of both addSink and subtractSink, use Keep.both:

val (addResult, subtractResult): (Future[Int], Future[Int]) =
  Source(1 to 20)
    .alsoToMat(addSink)(Keep.right)
    .toMat(subtractSink)(Keep.both) // <--
    .run()

Upvotes: 4

Related Questions