Yuriy Gatilin
Yuriy Gatilin

Reputation: 132

How to switch between multiple Sources?

Suppose I have two infinite sources of the same type witch could be connected to the one Graph. I want to switch between them from outside already materialized graph, might be the same way as it possible to shutdown one of them with KillSwitch.

val source1: Source[ByteString, NotUsed] = ???
val source2: Source[ByteString, NotUsed] = ???

val (switcher: Switcher, source: Source[ByteString, NotUsed]) = 
    Source.combine(source1,source2).withSwitcher.run()

switcher.switch()

By default I want to use source1 and after switch I want to consume data from source2

source1 
        \
          switcher ~> source    

source2

Is it possible to implement this logic with Akka Streams?

Upvotes: 1

Views: 278

Answers (1)

Yuriy Gatilin
Yuriy Gatilin

Reputation: 132

Ok, after some time I found the solution.

So here I can use the same principle as we have in VLAN. I just need to tag my sources and then pass them through MergeHub. After that it's easy to filter those sources by tag and produce right result as Source.

All that I need to switch from one to another Source is a change of filter condition.

source1.map(s => (tag1, s))
                           \
                             MergeHub.filter(_._1 == tagX).map(_._2) -> Source
                           /
source2.map(s => (tag2, s))

Here is some example:

object SomeSource {

  private var current = "tag1"

  val source1: Source[ByteString, NotUsed] = ???
  val source2: Source[ByteString, NotUsed] = ???

  def switch = {
     current = if (current == "tag1") "tag2" else "tag1"
  }

  val (sink: Sink[(String, ByteString), NotUsed], 
       source: Source[ByteString, NotUsed]) =
    MergeHub.source[(String, ByteString)]
      .filter(_._1 == current)
      .via(Flow[(String, ByteString)].map(_._2))
      .toMat(BroadcastHub.sink[ByteString])(Keep.both).run()

  source1.map(s => ("tag1", s)).runWith(sink)
  source2.map(s => ("tag2", s)).runWith(sink)

}

SomeSource.source    // do something with Source

SomeSource.switch()  // then switch

Upvotes: 2

Related Questions