Reputation: 45
I am trying to execute a future-returning operation on multiple items, and if any one of the operations fails, I need to execute custom rollback logic. However, the critical bit here is that the rollback logic MUST execute after all of the non-failing operations have completed.
Example:
def fn(thing: Thing): EitherT[Future, Error, Thing] = {
if (someCondition) {
EitherT(Future.successful(Right(thing)))
} else {
EitherT(Future.successful(Left(error)))
}
}
def rollback(): EitherT[...] = {
// general cleanup
}
List(things).map(fn).sequence.leftMap {
case err =>
rollback()
rrr
}
Based on the response to this question, I understand that, if my operation were returning a Future[Thing], the sequence would complete as soon as any operation failed. However, using EitherT, I would think that, because all the Futures return a success (of either Left or Right), the sequence should not complete until all the operations' futures complete.
However, what I am seeing is that my rollback
function is executing while the non-failed operations are still executing.
Is there an alternative to sequence I should be using here?
Thanks!
Upvotes: 1
Views: 203
Reputation: 48400
Futures run eagerly, and will keep going no matter what. Perhaps change fn
signature to
def fn(thing: Thing): Future[Either[Error, Thing]]
and then sequence
before constructing EitherT
which should ensure all the Future
s complete before next step in the chain
EitherT {
List(things)
.map(fn) // List[Future[Either[Error, Thing]]
.sequence // Future[List[Either[Error, Thing]]] (here all complete)
.map(_.sequence) // Future[Either[Error, List[Thing]]]
}.leftMap { err =>
rollback()
rrr
}
Upvotes: 1