Reputation: 5728
Suppose there's an Akka stream flow defined like this:
def tee = {
var writer: Writer = ???
Flow.fromFunction[String, String] { msg =>
writer.write(msg)
msg
}
}
When the upstream is finished it needs to flush an close the writer. Is there a way to do it without resorting to GraphStageLogic
etc as described here https://doc.akka.io/docs/akka/current/stream/stream-customize.html?
Upvotes: 0
Views: 123
Reputation: 7275
It's not possible doing this with Flow without converting it to a Sink.
If sink is an option, do following
def tee = {
val writer: Writer = new StringWriter()
Sink
.foreach[String] { msg =>
writer.write(msg)
}
.mapMaterializedValue(_.map { done =>
writer.close()
done
})
}
The similar thing can be done with the help of akka.stream.scaladsl.StreamConverters
as following
val sink: Sink[String, Future[IOResult]] = {
StreamConverters.fromOutputStream(() => new org.apache.commons.io.output.WriterOutputStream(writer)).contramap[String](ByteString.apply)
}
Upvotes: 1