Spar
Spar

Reputation: 483

How to send elements from a Source/Flow to 2 or more Sinks according to a condition?

I have a Source like:

Source[String, NotUsed] and a Flow[String, Int]

And I want to send the elements to 2 different Sinks according to a condition.

Is it better to do it like:

source ~> flow
flow.filter(cond) ~> sink1
flow.filter(!cond) ~> sink2

Or Use a Broadcaster like:

source ~> flow ~> broadcast
broadcast ~> sink1.filter(cond)
broadcast ~> sink2.filter(!cond)

???

What I am actually asking is: What is the best way(faster) to send an element to the appropriate sink according to a condition?

Is there a way to do it like:

if(cond)
   flow ~> sink1
else
   flow ~> sink2

Without having to pass filter for all elements twice?

Upvotes: 1

Views: 197

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19517

You could use Partition. The following example partitions a source of integers into two sinks: one for even numbers, the other for odd numbers:

val intSource = Source(1 to 100)
val sink1 = Sink.seq[Int]
val sink2 = Sink.seq[Int]

val g = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) {
  implicit builder => (s1, s2) =>
  import GraphDSL.Implicits._

  val partition = builder.add(Partition[Int](2, i => i % 2))

  intSource ~> partition.in
  partition.out(0) ~> s1.in
  partition.out(1) ~> s2.in
  ClosedShape
})

Upvotes: 1

Related Questions