Melvic Ybanez
Melvic Ybanez

Reputation: 2023

Akka Streams - How to check if a stream ran successfully?

I have a source source of type Source[ByteString, Any], and a sink sink of type Sink[ByteString, M], where M could be a Future[IOResult] or anything. When I run the following:

source.runWith(sink)

I get M as the result. I don't get any other useful information indicating that the stream was successful. If it's guaranteed to be Future[IOResult] it would be great, but the sink came from a generic typeclass and you can't determine the type at compile time.

However, all the instances of the typeclass are expected to report back on whether the operation was successful.

Or should I wrap the materialized type into a custom type, like Result[M], that I can easily manipulate? This might require a new typeclass of which instances of M have to be members (e.g. You should specify how Future[IOResult] converts to Result[Future[IOResult]]. Same for all the possible instances of M).

Upvotes: 2

Views: 1121

Answers (3)

Anand Singh
Anand Singh

Reputation: 141

runWith returns a future having Sucess or Failure. We can use onComplete callback to extract the value.

source.runWith(sink).onComplete {
  case Success(value) => logger.info(s"stream completed successfully $value")
  case Failure(e) => logger.error(s"stream completed with failure: $e")
}

Upvotes: 2

botkop
botkop

Reputation: 964

You could use a termination watcher on the source, and/or an onComplete on the stream.

source.watchTermination() { (_, done) =>
  done.onComplete {
    case Success(_) => logger.info("source completed successfully")
    case Failure(e) => logger.error(s"source completed with failure : $e")
  }
}
.runWith(sink)
.onComplete{
  case Success(_) => logger.info(s"stream completed successfully")
  case Failure(e) => logger.error(s"stream completed with failure: $e")
}

See https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/watchTermination.html

Upvotes: 3

Melvic Ybanez
Melvic Ybanez

Reputation: 2023

I managed to make it work. I made it a requirement that the sink should have a future as the auxiliary value:

Sink[ByteString, Future[M]]

This way any instances of the typeclass that returns this kind of sink will be forced to wrap the materialized value in a Future. For a file-based IO, it's not an issue because it returns Future[IOResult] already.

Upvotes: 0

Related Questions