Reputation: 483
I have a Source like:
Source[String, NotUsed] and a Flow[String, Int]
And I want to send the elements to 2 different Sinks according to a condition.
Is it better to do it like:
source ~> flow
flow.filter(cond) ~> sink1
flow.filter(!cond) ~> sink2
Or Use a Broadcaster like:
source ~> flow ~> broadcast
broadcast ~> sink1.filter(cond)
broadcast ~> sink2.filter(!cond)
???
What I am actually asking is: What is the best way(faster) to send an element to the appropriate sink according to a condition?
Is there a way to do it like:
if(cond)
flow ~> sink1
else
flow ~> sink2
Without having to pass filter for all elements twice?
Upvotes: 1
Views: 197
Reputation: 19517
You could use Partition
. The following example partitions a source of integers into two sinks: one for even numbers, the other for odd numbers:
val intSource = Source(1 to 100)
val sink1 = Sink.seq[Int]
val sink2 = Sink.seq[Int]
val g = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) {
implicit builder => (s1, s2) =>
import GraphDSL.Implicits._
val partition = builder.add(Partition[Int](2, i => i % 2))
intSource ~> partition.in
partition.out(0) ~> s1.in
partition.out(1) ~> s2.in
ClosedShape
})
Upvotes: 1