Reputation: 2023
I have a source source
of type Source[ByteString, Any]
, and a sink sink
of type Sink[ByteString, M]
, where M
could be a Future[IOResult]
or anything. When I run the following:
source.runWith(sink)
I get M
as the result. I don't get any other useful information indicating that the stream was successful. If it's guaranteed to be Future[IOResult]
it would be great, but the sink came from a generic typeclass and you can't determine the type at compile time.
However, all the instances of the typeclass are expected to report back on whether the operation was successful.
Or should I wrap the materialized type into a custom type, like Result[M]
, that I can easily manipulate? This might require a new typeclass of which instances of M have to be members (e.g. You should specify how Future[IOResult]
converts to Result[Future[IOResult]]
. Same for all the possible instances of M
).
Upvotes: 2
Views: 1121
Reputation: 141
runWith returns a future having Sucess or Failure. We can use onComplete callback to extract the value.
source.runWith(sink).onComplete {
case Success(value) => logger.info(s"stream completed successfully $value")
case Failure(e) => logger.error(s"stream completed with failure: $e")
}
Upvotes: 2
Reputation: 964
You could use a termination watcher on the source, and/or an onComplete
on the stream.
source.watchTermination() { (_, done) =>
done.onComplete {
case Success(_) => logger.info("source completed successfully")
case Failure(e) => logger.error(s"source completed with failure : $e")
}
}
.runWith(sink)
.onComplete{
case Success(_) => logger.info(s"stream completed successfully")
case Failure(e) => logger.error(s"stream completed with failure: $e")
}
See https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/watchTermination.html
Upvotes: 3
Reputation: 2023
I managed to make it work. I made it a requirement that the sink should have a future as the auxiliary value:
Sink[ByteString, Future[M]]
This way any instances of the typeclass that returns this kind of sink will be forced to wrap the materialized value in a Future
. For a file-based IO, it's not an issue because it returns Future[IOResult]
already.
Upvotes: 0