Reputation: 1804
When using Akka Streams, is there any way to close/shutdown a stream that is not required anymore for resource cleanup?
EDIT: When the source is made up of an infinite stream, it may never be completed and I would like to stop it before the completed source.
Example usage:
Source.from(publisher)
.map((p) -> p)
.to(Sink.ignore())
.run(materializer)
Is there a way to shutdown the stream?
Upvotes: 3
Views: 2008
Reputation: 17953
You could run the Stream
on an independent ActorMaterializer
and call shutdown on the ActorMaterializer after a certain period of time:
val actorSystem = ActorSystem()
val temporaryStream = {
val localMat = ActorMaterializer()(actorSystem)
import actorSystem.dispatcher
actorSystem.scheduler.scheduleOnce(10 minutes) { localMat.shutdown() }
Source.from(publisher)
.map((p) -> p)
.to(Sink.ignore())
.run()(localMat)
}
Similarly you could return the ActorMaterializer instead of the materialized stream and shutdown the ActorMaterializer based on some external condition other than time.
Upvotes: 5