Reputation: 3369
I am using an API that accepts a single AKKA Sink and fills it with data:
def fillSink(sink:Sink[String, _])
Is there a way, without delving into the depths of akka, to handle the output with two sinks instead of one?
For example
val mySink1:Sink = ...
val mySink2:Sink = ...
//something
fillSink( bothSinks )
If I had access to the Flow used by the fillSink
method I could use flow.alsoTo(mySink1).to(mySink2)
but the flow is not exposed.
The only workaround at the moment is to pass a single Sink which handles the strings and passes it on to two StringBuilders to replace mySink1/mySink2
, but it feels like that defeats the point of AKKA. Without spending a couple days learning AKKA, I can't tell if there is a way to split output from sinks.
Thanks!
Upvotes: 4
Views: 277
Reputation: 22449
The combine
Sink operator, which combines two or more Sinks using a provided Int => Graph[UniformFanOutShape[T, U], NotUsed]]
function, might be what you're seeking:
def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed]
A trivialized example:
val doubleSink = Sink.foreach[Int](i => println(s"Doubler: ${i*2}"))
val tripleSink = Sink.foreach[Int](i => println(s" Triper: ${i*3}"))
val combinedSink = Sink.combine(doubleSink, tripleSink)(Broadcast[Int](_))
Source(List(1, 2, 3)).runWith(combinedSink)
// Doubler: 2
// Triper: 3
// Doubler: 4
// Triper: 6
// Doubler: 6
// Triper: 9
Upvotes: 5