Reputation: 463
I'm designing a small tool that will generate CSV test data. I want to use Akka Streams (1.0-RC4) to implement the data flow. There will be a Source that generates random numbers, a transformation into CSV strings, some rate limiter and a Sink that writes into a file.
Also there should be a clean way of stopping the tool using a small REST interface.
This is where I'm struggling. After the stream has been started (RunnableFlow.run()) there seems to be no way of stopping it. Source and Sink are infinite (at least until disk runs full :)) so they will not stop the stream.
Adding control logic to Source or Sink feels wrong. Using ActorSystem.shutdown() too. What would be a good way of stopping the stream?
Upvotes: 8
Views: 5286
Reputation: 9524
You can use Akka KillSwitches to abort (fail) or shutdown a stream.
There are two types of killswitches:
Code examples are available in the links, but here is an example of aborting multiple streams with a shared killswitch:
val countingSrc = Source(Stream.from(1)).delay(1.second)
val lastSnk = Sink.last[Int]
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")
val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val error = new RuntimeException("boom!")
sharedKillSwitch.abort(error)
Await.result(last1.failed, 1.second) shouldBe error
Await.result(last2.failed, 1.second) shouldBe error
Upvotes: 0
Reputation: 1833
Not exactly stopping, but limiting. You can use limit
or take
.
Example from Streams Cookbook:
val MAX_ALLOWED_SIZE = 100
// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
val limited: Future[Seq[String]] =
mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq)
// OK. Collect up until max-th elements only, then cancel upstream
val ignoreOverflow: Future[Seq[String]] =
mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq)
Upvotes: 0
Reputation: 463
Ok, so I found a decent solution. It was already sitting there under my nose, I just did not see it. Source.lazyEmpty
materializes into a promise that when completed will terminate the Source and the stream behind it.
The remaining question is, how to include it into the infinite stream of random numbers. I tried Zip
. The result was that no random numbers made it through the stream because lazyEmpty
never emits values (doh). I tried Merge
but the stream never terminated because Merge
continues until all sources have completed.
So I wrote my own merge. It forwards all values from one of the input ports and terminates when any source completed.
object StopperFlow {
private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
val in = newInlet[A]("in")
val stopper = newInlet[Unit]("stopper")
override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
}
private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
new StopperMergeShape(), Attributes.name("StopperMerge")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[In] {
override def initialState =
State[In](Read(p.in)) { (ctx, input, element) =>
ctx.emit(element)
SameState
}
override def initialCompletionHandling = eagerClose
}
}
def apply[In](): Flow[In, In, Promise[Unit]] = {
val stopperSource = Source.lazyEmpty[Unit]
Flow(stopperSource) { implicit builder =>
stopper =>
val stopperMerge = builder.add(new StopperMerge[In]())
stopper ~> stopperMerge.stopper
(stopperMerge.in, stopperMerge.out)
}
}
}
The flow can be plugged into any stream. When materialized it will return a Promise
that terminates the stream on completion. Here's my test for it.
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val startTime = System.currentTimeMillis()
def dumpToConsole(f: Float) = {
val timeSinceStart = System.currentTimeMillis() - startTime
System.out.println(s"[$timeSinceStart] - Random number: $f")
}
val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)
val (_, promise) = flow.run()
Thread.sleep(1000)
val _ = promise.success(())
Thread.sleep(1000)
I hope this is useful for other too. Still leaves me puzzled why there is no built in way for terminating streams from outside of the stream.
Upvotes: 10