Reputation: 7488
I have a list of Future, with each Future completing with a List of values OR with a failure. I am trying to combine these futures in a way that all available values are captured and errors are logged. This is my attempt:
val individualFutures: List[Future[Seq[Element]]] = ....
val allElements: Future[List[Element]] = Future.foldLeft(individualFutures)(List[Element]())((acc,
elements) => acc ++ elements)
Try(Await.result(allElements, Duration.Inf)) match {
case Success(elements) => ....
case Failure(error) => ....log
}
But I don't think this is right way to achieve my objective. I guess as soon as a future completes with an error, this would stop there.
How can I achieve same?
Upvotes: 0
Views: 280
Reputation: 1132
I'm extending little bit the answer, but I think you can find it useful.
You can do something as follows:
Helper:
object FutureSupport extends StrictLogging {
def aggregateSuccessSequence[In](futures: List[Future[In]], logOnFailure: Boolean = false)(implicit ec: ExecutionContext): Future[AggregateResult[In]] = {
val futureTries = futures.map(_.map(Success(_)).recover {
case NonFatal(ex) =>
if (logOnFailure) {
logger.warn(s"FutureSupport: Failure occurred on aggregateSuccessSequence", ex)
}
Failure(ex)
})
Future.sequence(futureTries).map {
_.foldRight(AggregateResult.empty[In]) {
case (curr, acc) =>
curr match {
case Success(res) => AggregateResult(acc.failureOccurred, res :: acc.outs)
case Failure(_) => AggregateResult(failureOccurred = true, acc.outs)
}
}
}
}
def aggregateSuccessTraversal[In, Out](source: List[In])(f: In => Future[Option[Out]])(logOnFailure: Boolean = false)(implicit ec: ExecutionContext): Future[AggregateResult[Out]] = {
if (source.isEmpty) {
Future.successful(AggregateResult.empty[In])
}
Future
.traverse(source)(in => f(in).map(Success(_)).recover {
case NonFatal(ex) =>
if (logOnFailure) {
logger.warn(s"FutureSupport: Failure occurred on aggregateSuccessTraversal", ex)
}
Failure(ex)
})
.map {
_.foldRight(AggregateResult.empty[Out]) {
case (curr, acc) =>
curr match {
case Failure(_) => acc.copy(failureOccurred = true)
case Success(opt) => opt match {
case Some(success) => acc.copy(outs = success :: acc.outs)
case None => acc
}
}
}
}
}
case class AggregateResult[Out](failureOccurred: Boolean, outs: List[Out])
object AggregateResult {
def empty[Out]: AggregateResult[Out] = AggregateResult(failureOccurred = false, Nil)
}
}
Usage:
aggregateSuccessSequence(List(Future.successful(1), Future.failed(new Exception))).map {
case AggregateResult(failureOccurred, outs) if failureOccurred => ...
case AggregateResult(failureOccurred, outs) => ...
}
You can use the same with traversal
Upvotes: 0
Reputation: 166
Maybe you could traverse the list and recover from throwables by returning an empty list and logging a failure reason? At the end you would flatten the whole sequence.
val allElements: Future[List[Element]] = Future.traverse(individualFutures) {
_.recover {
case throwable: Throwable =>
println(throwable.getMessage)
Nil
}
}.map(_.flatten)
Upvotes: 2