tonicebrian
tonicebrian

Reputation: 4795

Is it possible to spawn Flows in Akka-Streams based on another stream?

Following example of BroadcastHub it is possible to spawn dynamically workers that listen to the same producer. But this spawning must be done explicitly in code. I wonder if it can be coded as a reaction of an event in a stream.

In the example below I would like to spawn 2 more workers after receiving the "Spawn" message in the spawns stream. Is it possible?

package com.example

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source}

import scala.concurrent.duration._

object TestApp extends App {
  implicit val system = ActorSystem("system")
  implicit val materializer = ActorMaterializer()

  val ticks = Source.tick(0.second, 1.second, "Tick").take(10)

  val broadcaster = ticks.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()

  def prefixFlow(tag:String) = Flow[String].map(_ + s" from $tag").to(Sink.foreach(println))

  // Print out messages from the producer in two independent consumers
  broadcaster.runWith(prefixFlow("1"))
  broadcaster.runWith(prefixFlow("2"))

  // Is it possible to spawn more flows based on another stream?
  val spawns = Source.tick(2.second, 3.second, "Spawn").take(2)
  // spawns.foreach(broadcaster.runWith(prefixFlow("XXX"))
}

Upvotes: 0

Views: 98

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

In your specific example, could you be looking for a simple map?

val spawns = Source
  .tick(2.second, 3.second, "Spawn")
  .take(2)
  .map(_ ⇒ broadcaster.runWith(prefixFlow("XXX")))
  .runWith(Sink.ignore)

Upvotes: 2

Related Questions