Reputation: 2682
In Akka Stream 2.4.2, PushStage has been deprecated. For Streams 2.0.3 I was using the solution from this answer:
How does one close an Akka stream?
which was:
import akka.stream.stage._
val closeStage = new PushStage[Tpe, Tpe] {
override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
case elem if shouldCloseStream ⇒
// println("stream closed")
ctx.finish()
case elem ⇒
ctx.push(elem)
}
}
How would I close a stream in 2.4.2 immediately, from inside a GraphStage / onPush() ?
Upvotes: 3
Views: 1156
Reputation: 2682
Posting for other people's reference. sschaef's answer is correct procedurally, but the connections was kept open for a minute and eventually would time out and throw a "no activity" exception, closing the connection.
In reading the docs further, I noticed that the connection was closed when all upstreams flows completed. In my case, I had more than one upstream.
For my particular use case, the fix was to add eagerComplete=true to close stream as soon as any (rather than all) upstream completes. Something like:
... = builder.add(Merge[MyObj](3,eagerComplete = true))
Hope this helps someone.
Upvotes: 1
Reputation: 53358
Use something like this:
val closeStage = new GraphStage[FlowShape[Tpe, Tpe]] {
val in = Inlet[Tpe]("closeStage.in")
val out = Outlet[Tpe]("closeStage.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = grab(in) match {
case elem if shouldCloseStream ⇒
// println("stream closed")
completeStage()
case msg ⇒
push(out, msg)
}
})
setHandler(out, new OutHandler {
override def onPull() = pull(in)
})
}
}
It is more verbose but one the one side one can define this logic in a reusable way and on the other side one no longer has to worry about differences between the stream elements because the GraphStage
can be handled in the same way as a flow would be handled:
val flow: Flow[Tpe] = ???
val newFlow = flow.via(closeStage)
Upvotes: 3