Mandroid
Mandroid

Reputation: 7488

Scala: Combining a list of Future for all successes and errors

I have a list of Future, with each Future completing with a List of values OR with a failure. I am trying to combine these futures in a way that all available values are captured and errors are logged. This is my attempt:

val individualFutures:  List[Future[Seq[Element]]] = ....
val allElements: Future[List[Element]] = Future.foldLeft(individualFutures)(List[Element]())((acc, 
     elements) => acc ++ elements)
Try(Await.result(allElements, Duration.Inf)) match {
      case Success(elements) => ....
      case Failure(error)    => ....log
}

But I don't think this is right way to achieve my objective. I guess as soon as a future completes with an error, this would stop there.

How can I achieve same?

Upvotes: 0

Views: 280

Answers (2)

Zvi Mints
Zvi Mints

Reputation: 1132

I'm extending little bit the answer, but I think you can find it useful.

You can do something as follows:

  1. Create a helper object which will use to aggregate the logic and log on failures, also maybe you would like to get a result of this component which will tell if failures occurred.
  2. Use this helper object to calculate

Helper:


object FutureSupport extends StrictLogging {
  
  def aggregateSuccessSequence[In](futures: List[Future[In]], logOnFailure: Boolean = false)(implicit ec: ExecutionContext): Future[AggregateResult[In]] = {
    val futureTries = futures.map(_.map(Success(_)).recover {
      case NonFatal(ex) =>
        if (logOnFailure) {
          logger.warn(s"FutureSupport: Failure occurred on aggregateSuccessSequence", ex)
        }
        Failure(ex)
    })
    Future.sequence(futureTries).map {
      _.foldRight(AggregateResult.empty[In]) {
        case (curr, acc) =>
          curr match {
            case Success(res) => AggregateResult(acc.failureOccurred, res :: acc.outs)
            case Failure(_) => AggregateResult(failureOccurred = true, acc.outs)
          }
      }
    }
  }

  def aggregateSuccessTraversal[In, Out](source: List[In])(f: In => Future[Option[Out]])(logOnFailure: Boolean = false)(implicit ec: ExecutionContext): Future[AggregateResult[Out]] = {
    if (source.isEmpty) {
      Future.successful(AggregateResult.empty[In])
    }

    Future
      .traverse(source)(in => f(in).map(Success(_)).recover {
        case NonFatal(ex) =>
          if (logOnFailure) {
            logger.warn(s"FutureSupport: Failure occurred on aggregateSuccessTraversal", ex)
          }
          Failure(ex)
      })
      .map {
        _.foldRight(AggregateResult.empty[Out]) {
          case (curr, acc) =>
            curr match {
              case Failure(_) => acc.copy(failureOccurred = true)
              case Success(opt) => opt match {
                case Some(success) => acc.copy(outs = success :: acc.outs)
                case None => acc
              }
            }
        }
      }
  }

  case class AggregateResult[Out](failureOccurred: Boolean, outs: List[Out])
  object AggregateResult {
    def empty[Out]: AggregateResult[Out] = AggregateResult(failureOccurred = false, Nil)
  }
}

Usage:

aggregateSuccessSequence(List(Future.successful(1), Future.failed(new Exception))).map {
      case AggregateResult(failureOccurred, outs) if  failureOccurred => ...
      case AggregateResult(failureOccurred, outs) => ... 
}

You can use the same with traversal

Upvotes: 0

farmac
farmac

Reputation: 166

Maybe you could traverse the list and recover from throwables by returning an empty list and logging a failure reason? At the end you would flatten the whole sequence.

  val allElements: Future[List[Element]] = Future.traverse(individualFutures) {
    _.recover {
      case throwable: Throwable =>
        println(throwable.getMessage)
        Nil
    }
  }.map(_.flatten)

Upvotes: 2

Related Questions