Sydney
Sydney

Reputation: 45

cats.sequence executes before all Futures complete even if lifted to EitherT

I am trying to execute a future-returning operation on multiple items, and if any one of the operations fails, I need to execute custom rollback logic. However, the critical bit here is that the rollback logic MUST execute after all of the non-failing operations have completed.

Example:

  def fn(thing: Thing): EitherT[Future, Error, Thing] = {
    if (someCondition) {
      EitherT(Future.successful(Right(thing)))
    } else {
      EitherT(Future.successful(Left(error)))
    }
  }

  def rollback(): EitherT[...] = {
    // general cleanup
  }

  List(things).map(fn).sequence.leftMap {
    case err =>
      rollback()
      rrr
    
  }

Based on the response to this question, I understand that, if my operation were returning a Future[Thing], the sequence would complete as soon as any operation failed. However, using EitherT, I would think that, because all the Futures return a success (of either Left or Right), the sequence should not complete until all the operations' futures complete.

However, what I am seeing is that my rollback function is executing while the non-failed operations are still executing.

Is there an alternative to sequence I should be using here?

Thanks!

Upvotes: 1

Views: 203

Answers (1)

Mario Galic
Mario Galic

Reputation: 48400

Futures run eagerly, and will keep going no matter what. Perhaps change fn signature to

def fn(thing: Thing): Future[Either[Error, Thing]]

and then sequence before constructing EitherT which should ensure all the Futures complete before next step in the chain

EitherT { 
  List(things)
    .map(fn)         // List[Future[Either[Error, Thing]]
    .sequence        // Future[List[Either[Error, Thing]]] (here all complete)
    .map(_.sequence) // Future[Either[Error, List[Thing]]]
}.leftMap { err =>
  rollback()
  rrr
}

Upvotes: 1

Related Questions