Simão Martins
Simão Martins

Reputation: 1240

Scalaz task firstCompletedOf

I have two scalaz.concurrent.Tasks which are performing a HTTP request to different servers.

I want to compose them in a manner similar to Future.firstCompletedOf, that is: run them both in parallel and get the result of the first one that successfully completes.

Unfortunately Task.gatherUnordered does not what I want since it runs every task to completion before returning the result.

Upvotes: 1

Views: 119

Answers (2)

P. Frolov
P. Frolov

Reputation: 876

While using bimap is indeed correct, there's an alternate implementation:

import scalaz.concurrent.Task
import scalaz.Nondeterminism

def firstOf[A, B, C](ta: Task[A], tb: Task[B])(fa: A => C, fb: B => C): Task[C] = 
  Nondeterminism[Task].chooseAny(ta.map(fa), Seq(tb.map(fb))).map(_._1)

val task1 = Task { Thread.sleep(10000); 4 }
val task2 = Task { Thread.sleep(5000); "test" }
firstOf(task1, task2)(_.toString, identity).unsafePerformSync // test

Here I'm assuming that non-deterministic retrieval of results is used to obtain equivalent values for which exact computation time is unknown. So the function incorporates concurrently-performed conversions fa and fb to the common type. It's good in the cases when conversion time is difficult to compute as well - it selects first result after conversion, for example, some request data extraction in the case of HTTP. For simpler cases, variant of race function that performs mapping in parallel is retrieved from firstOf as follows:

def race[A, B](ta: Task[A], tb: Task[B]): Task[A \/ B] = firstOf(ta, tb)(-\/(_), \/-(_))

Upvotes: 1

I See Voices
I See Voices

Reputation: 842

Not sure how to do it in scalaz.concurrent natively, but this one works for me:

import scalaz.Nondeterminism._
import scalaz.std.either.eitherInstance
import scalaz.syntax.bitraverse._

def race[A, B](t1: Task[A], t2: Task[B]): Task[A \/ B] = {
    Nondeterminism[Task].choose(t1, t2).map {
      _.bimap(_._1, _._2)
    }
}

In fs2 - successor of scalaz.concurrent - it is fs2.async#race

Upvotes: 1

Related Questions