Reputation: 463
I'm designing a small tool that will generate CSV test data. I want to use Akka Streams (1.0-RC4) to implement the data flow. There will be a Source that generates random numbers, a transformation into CSV strings, some rate limiter and a Sink that writes into a file.
Also there should be a clean way of stopping the tool using a small REST interface.
This is where I'm struggling. After the stream has been started ( there seems to be no way of stopping it. Source and Sink are infinite (at least until disk runs full :)) so they will not stop the stream.
Adding control logic to Source or Sink feels wrong. Using ActorSystem.shutdown() too. What would be a good way of stopping the stream?
Upvotes: 8
Views: 5286
Reputation: 9524
You can use Akka KillSwitches to abort (fail) or shutdown a stream.
There are two types of killswitches:
Code examples are available in the links, but here is an example of aborting multiple streams with a shared killswitch:
val countingSrc = Source(Stream.from(1)).delay(1.second)
val lastSnk = Sink.last[Int]
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val error = new RuntimeException("boom!")
Await.result(last1.failed, 1.second) shouldBe error
Await.result(last2.failed, 1.second) shouldBe error
Upvotes: 0
Reputation: 1833
Not exactly stopping, but limiting. You can use limit
or take
Example from Streams Cookbook:
// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
val limited: Future[Seq[String]] =
// OK. Collect up until max-th elements only, then cancel upstream
val ignoreOverflow: Future[Seq[String]] =
Upvotes: 0
Reputation: 463
Ok, so I found a decent solution. It was already sitting there under my nose, I just did not see it. Source.lazyEmpty
materializes into a promise that when completed will terminate the Source and the stream behind it.
The remaining question is, how to include it into the infinite stream of random numbers. I tried Zip
. The result was that no random numbers made it through the stream because lazyEmpty
never emits values (doh). I tried Merge
but the stream never terminated because Merge
continues until all sources have completed.
So I wrote my own merge. It forwards all values from one of the input ports and terminates when any source completed.
object StopperFlow {
private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
val in = newInlet[A]("in")
val stopper = newInlet[Unit]("stopper")
override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
new StopperMergeShape(),"StopperMerge")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[In] {
override def initialState =
State[In](Read( { (ctx, input, element) =>
override def initialCompletionHandling = eagerClose
def apply[In](): Flow[In, In, Promise[Unit]] = {
val stopperSource = Source.lazyEmpty[Unit]
Flow(stopperSource) { implicit builder =>
stopper =>
val stopperMerge = builder.add(new StopperMerge[In]())
stopper ~> stopperMerge.stopper
(, stopperMerge.out)
The flow can be plugged into any stream. When materialized it will return a Promise
that terminates the stream on completion. Here's my test for it.
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val startTime = System.currentTimeMillis()
def dumpToConsole(f: Float) = {
val timeSinceStart = System.currentTimeMillis() - startTime
System.out.println(s"[$timeSinceStart] - Random number: $f")
val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)
val (_, promise) =
val _ = promise.success(())
I hope this is useful for other too. Still leaves me puzzled why there is no built in way for terminating streams from outside of the stream.
Upvotes: 10