Reputation: 1240
I have two scalaz.concurrent.Task
s which are performing a HTTP request to different servers.
I want to compose them in a manner similar to Future.firstCompletedOf
, that is: run them both in parallel and get the result of the first one that successfully completes.
Unfortunately Task.gatherUnordered
does not what I want since it runs every task to completion before returning the result.
Upvotes: 1
Views: 119
Reputation: 876
While using bimap
is indeed correct, there's an alternate implementation:
import scalaz.concurrent.Task
import scalaz.Nondeterminism
def firstOf[A, B, C](ta: Task[A], tb: Task[B])(fa: A => C, fb: B => C): Task[C] =
Nondeterminism[Task].chooseAny(ta.map(fa), Seq(tb.map(fb))).map(_._1)
val task1 = Task { Thread.sleep(10000); 4 }
val task2 = Task { Thread.sleep(5000); "test" }
firstOf(task1, task2)(_.toString, identity).unsafePerformSync // test
Here I'm assuming that non-deterministic retrieval of results is used to obtain equivalent values for which exact computation time is unknown. So the function incorporates concurrently-performed conversions fa
and fb
to the common type. It's good in the cases when conversion time is difficult to compute as well - it selects first result after conversion, for example, some request data extraction in the case of HTTP. For simpler cases, variant of race
function that performs mapping in parallel is retrieved from firstOf
as follows:
def race[A, B](ta: Task[A], tb: Task[B]): Task[A \/ B] = firstOf(ta, tb)(-\/(_), \/-(_))
Upvotes: 1
Reputation: 842
Not sure how to do it in scalaz.concurrent
natively, but this one works for me:
import scalaz.Nondeterminism._
import scalaz.std.either.eitherInstance
import scalaz.syntax.bitraverse._
def race[A, B](t1: Task[A], t2: Task[B]): Task[A \/ B] = {
Nondeterminism[Task].choose(t1, t2).map {
_.bimap(_._1, _._2)
}
}
In fs2
- successor of scalaz.concurrent
- it is fs2.async#race
Upvotes: 1