fluency03
fluency03

Reputation: 2697

How to discard other Futures if the critical Future is finished in Scala?

Let's say I have three remote calls in order to construct my page. One of them (X) is critical for the page and the other two (A, B) just used to enhance the experience.

Because criticalFutureX is too important to be effected by futureA and futureB, so I want the overall latency of of all remote calls to be Not more than X.

That means, in case of criticalFutureX finishes, I want to discard futureA and futureB.

val criticalFutureX = ...
val futureA = ...
val futureB = ...

// the overall latency of this for-comprehension depends on the longest among X, A and B
for {
  x <- criticalFutureX
  a <- futureA
  b <- futureB
} ...

In the above example, even though they are executed in parallel, the overall latency depends on the longest among X, A and B, which is not what I want.

Latencies:
X: |----------|
A: |---------------|
B: |---|

O: |---------------| (overall latency)

There is firstCompletedOf but it can not be used to explicit say "in case of completed of criticalFutureX".

Is there something like the following?

val criticalFutureX = ...
val futureA = ...
val futureB = ...

for {
  x <- criticalFutureX
  a <- futureA // discard when criticalFutureX finished
  b <- futureB // discard when criticalFutureX finished
} ...

X: |----------|
A: |-----------... discarded
B: |---|

O: |----------| (overall latency)

Upvotes: 2

Views: 208

Answers (2)

Matthias Berndt
Matthias Berndt

Reputation: 4587

This kind of task is very hard to achieve efficiently, reliably and safely with Scala standard library Futures. There is no way to interrupt a Future that hasn't completed yet, meaning that even if you choose to ignore its result, it will still keep running and waste memory and CPU time. And even if there was a method to interrupt a running Future, there is no way to ensure that resources that were allocated (network connections, open files etc.) will be properly released.

I would like to point out that the implementation given by Ivan Stanislavciuc has a bug: if the main Future fails, then the promise will never be completed, which is unlikely to be what you want.

I would therefore strongly suggest looking into modern concurrent effect systems like ZIO or cats-effect. These are not only safer and faster, but also much easier. Here's an implementation with ZIO that doesn't have this bug:

import zio.{Exit, Task}
import Function.tupled

def completeOnMain[A, B](
  main: Task[A], secondary: Task[B]): Task[(A, Exit[Throwable, B])] =
  (main.forkManaged zip secondary.forkManaged).use {
    tupled(_.join zip _.interrupt)
  }

Exit is a type that describes how the secondary task ended, i. e. by successfully returning a B or because of an error (of type Throwable) or due to interruption.

Note that this function can be given a much more sophisticated signature that tells you a lot more about what's going on, but I wanted to keep it simple here.

Upvotes: 0

Ivan Stanislavciuc
Ivan Stanislavciuc

Reputation: 7275

You can achieve this with a promise


  def completeOnMain[A, B](main: Future[A], secondary: Future[B]) = {
    val promise = Promise[Option[B]]()
    main.onComplete {
      case Failure(_) =>
      case Success(_) => promise.trySuccess(None)
    }
    secondary.onComplete {
      case Failure(exception) => promise.tryFailure(exception)
      case Success(value)     => promise.trySuccess(Option(value))
    }
    promise.future
  }

Some testing code

  private def runFor(first: Int, second: Int) = {

    def run(millis: Int) = Future {
      Thread.sleep(millis);
      millis
    }

    val start = System.currentTimeMillis()
    val combined = for {
      _ <- Future.unit
      f1 = run(first)
      f2 = completeOnMain(f1, run(second))
      r1 <- f1
      r2 <- f2
    } yield (r1, r2)

    val result = Await.result(combined, 10.seconds)
    println(s"It took: ${System.currentTimeMillis() - start}: $result")
  }

  runFor(3000, 4000)
  runFor(3000, 1000)

Produces

It took: 3131: (3000,None)
It took: 3001: (3000,Some(1000))

Upvotes: 4

Related Questions