tmouron
tmouron

Reputation: 110

With Akka Stream, how to dynamically duplicate a flow?

I'm running a live video streaming server. There's an Array[Byte] video source. Note that I can't get 2 connections to my video source. I want every client connecting to my server to receive this same stream, with a buffer discarding the old frames.

I tried using a BroadcastHub like this :

  val source =
    Source.fromIterator(() => myVideoStreamingSource.zipWithIndex)

  val runnableGraph =
    source.toMat(BroadcastHub.sink(bufferSize = 2))(Keep.right)

  runnableGraph.run().to(Sink.foreach { index =>
      println(s"client A reading frame #$index")
  }).run()

  runnableGraph.run().to(Sink.foreach { index =>
      println(s"client B reading frame #$index")
  }).run()

I get :

client A reading frame #0
client B reading frame #1
client A reading frame #2
client B reading frame #3

We see that the main stream is partitioned between the two clients, whereas I'd expect my two client being able to see all the source stream's frames.

Did I miss something, or is there any other solution ?

Upvotes: 0

Views: 387

Answers (1)

Tomer Shetah
Tomer Shetah

Reputation: 8529

The issue is the combination of Iterator with BroadcastHub. I assume you myVideoStreamingSource is something like:

val myVideoStreamingSource = Iterator("A","B","C","D","E")

I'll now quote from BroadcastHub.Sink:

Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own [[Source]] for consuming the [[Sink]] of that materialization.

The issue here for you, is that it does not yet consume the data from the iterator.

The thing with iterator, is that once you consumed its data, you won't get back to the beginning again. Add to that the fact that both graphs run in parallel, it looks like it "divides" the elements between the two. But actually that is completely random. For example, if you add a sleep of 1 second between the Client A and Client B, so the only client that will print will be A.

In order to get that work, you need to create a source that is reversible. For example, Seq, or List. The following will do:

val myVideoStreamingSource = Seq("A","B","C","D","E")
val source = Source.fromIterator(() => myVideoStreamingSource.zipWithIndex.iterator)

Upvotes: 1

Related Questions