Ali
Ali

Reputation: 267077

Multiple futures that may fail - returning both successes and failures?

I have a situation where I need to run a bunch of operations in parallel.

All operations have the same return value (say a Seq[String]).

Its possible that some of the operations may fail, and others successfully return results.

I want to return both the successful results, and any exceptions that happened, so I can log them for debugging.

Is there a built-in way, or easy way through any library (cats/scalaz) to do this, before I go and write my own class for doing this?

I was thinking of doing each operation in its own future, then checking each future, and returning a tuple of Seq[String] -> Seq[Throwable] where left value is the successful results (flattened / combined) and right is a list of any exceptions that occurred.

Is there a better way?

Upvotes: 1

Views: 589

Answers (4)

Mikhail Golubtsov
Mikhail Golubtsov

Reputation: 6653

I'd do it this way:

import scala.concurrent.{Future, ExecutionContext}
import scala.util.Success

def eitherify[A](f: Future[A])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] = f.transform(tryResult => Success(tryResult.toEither))

def eitherifyF[A, B](f: A => Future[B])(implicit ec: ExecutionContext): A => Future[Either[Throwable, B]] = { a => eitherify(f(a)) }

// here we need some "cats" magic for `traverse` and `separate`
// instead of `traverse` you can use standard `Future.sequence`
// there is no analogue for `separate` in the standard library

import cats.implicits._

def myProgram[A, B](values: List[A], asyncF: A => Future[B])(implicit ec: ExecutionContext): Future[(List[Throwable], List[B])] = {
  val appliedTransformations: Future[List[Either[Throwable, B]]] = values.traverse(eitherifyF(asyncF))
  appliedTransformations.map(_.separate)
}

Upvotes: 0

Alexey Romanov
Alexey Romanov

Reputation: 170735

Using Await.ready, which you mention in a comment, generally loses most benefits from using futures. Instead you can do this just using the normal Future combinators. And let's do the more generic version, which works for any return type; flattening the Seq[String]s can be added easily.

def successesAndFailures[T](futures: Seq[Future[T]]): Future[(Seq[T], Seq[Throwable])] = {
  // first, promote all futures to Either without failures
  val eitherFutures: Seq[Future[Either[Throwable, T]]] = 
    futures.map(_.transform(x => Success(x.toEither)))
  // then sequence to flip Future and Seq
  val futureEithers: Future[Seq[Either[Throwable, T]]] = 
    Future.sequence(eitherFutures)
  // finally, Seq of Eithers can be separated into Seqs of Lefts and Rights
  futureEithers.map { seqOfEithers =>
    val (lefts, rights) = seqOfEithers.partition(_.isLeft)
    val failures = lefts.map(_.left.get)
    val successes = rights.map(_.right.get)
    (successes, failures)
  }
}

Scalaz and Cats have separate to simplify the last step.

The types can be inferred by the compiler, they are shown just to help you see the logic.

Upvotes: 2

jwvh
jwvh

Reputation: 51271

Calling value on your Future returns an Option[Try[T]]. If the Future has not completed then the Option is None. If it has completed then it's easy to unwrap and process.

if (myFutr.isCompleted)
  myFutr.value.map(_.fold( err: Throwable  => //log the error
                         , ss: Seq[String] => //process results
                         ))
else
 // do something else, come back later

Upvotes: 1

Dici
Dici

Reputation: 25950

Sounds like a good use-case for the Try idiom (it's basically similar to the Either monad).

Example of usage from the doc:

import scala.util.{Success, Failure}

val f: Future[List[String]] = Future {
  session.getRecentPosts
}

f onComplete {
  case Success(posts) => for (post <- posts) println(post)
  case Failure(t) => println("An error has occurred: " + t.getMessage)
}

It actually does a little bit more than what you asked because it is fully asynchronous. Does it fit your use-case?

Upvotes: 1

Related Questions