Reputation: 132
Suppose I have two infinite sources of the same type witch could be connected to the one Graph. I want to switch between them from outside already materialized graph, might be the same way as it possible to shutdown one of them with KillSwitch.
val source1: Source[ByteString, NotUsed] = ???
val source2: Source[ByteString, NotUsed] = ???
val (switcher: Switcher, source: Source[ByteString, NotUsed]) =
Source.combine(source1,source2).withSwitcher.run()
switcher.switch()
By default I want to use source1
and after switch I want to consume data from source2
source1
\
switcher ~> source
source2
Is it possible to implement this logic with Akka Streams?
Upvotes: 1
Views: 278
Reputation: 132
Ok, after some time I found the solution.
So here I can use the same principle as we have in VLAN. I just need to tag my sources and then pass them through MergeHub. After that it's easy to filter those sources by tag and produce right result as Source.
All that I need to switch from one to another Source is a change of filter condition.
source1.map(s => (tag1, s))
\
MergeHub.filter(_._1 == tagX).map(_._2) -> Source
/
source2.map(s => (tag2, s))
Here is some example:
object SomeSource {
private var current = "tag1"
val source1: Source[ByteString, NotUsed] = ???
val source2: Source[ByteString, NotUsed] = ???
def switch = {
current = if (current == "tag1") "tag2" else "tag1"
}
val (sink: Sink[(String, ByteString), NotUsed],
source: Source[ByteString, NotUsed]) =
MergeHub.source[(String, ByteString)]
.filter(_._1 == current)
.via(Flow[(String, ByteString)].map(_._2))
.toMat(BroadcastHub.sink[ByteString])(Keep.both).run()
source1.map(s => ("tag1", s)).runWith(sink)
source2.map(s => ("tag2", s)).runWith(sink)
}
SomeSource.source // do something with Source
SomeSource.switch() // then switch
Upvotes: 2