Florian Baierl
Florian Baierl

Reputation: 2491

parMapN that finishes even when one program encounters an error

Using parMapN, multiple IOs can be executed in parallel, like this:

import cats.implicits._
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) })
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) })
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) })

val program = (ioA, ioB, ioC).parMapN { (_, _, _) => () }

program.unsafeRunSync()

Sample output:

A1
C1
B1
A2
C2
B2
A3
C3
B3
A4
C4
B4
A5
B5
C5
A6
B6
C6
A7
B7
C7
A8
...

According to the documentation, unfinished tasks get cancelled if any of the IOs completes with a failure. What is the best way of changing this mechanism, so that all IOs are finishing anyway?

In my case some of the IOs do not return anything (IO[Unit]) and I still want to make sure everything runs until it is finished or encounters an error.

Upvotes: 1

Views: 1583

Answers (2)

emerson moura
emerson moura

Reputation: 345

As far as I noticed there is nothing raising any error on your example code. So you should have a code like the bellow one to be able to see this feature:

val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) })
val ioB = IO.raiseError[Unit](new Exception("boom"))
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) })

Also it looks not so good since the attempt function will change the internal structure to an IO[Either[_,_]] which does not appear your intention, does it?

Upvotes: 0

Florian Baierl
Florian Baierl

Reputation: 2491

Well I found one possible answer shortly after posting my question. Not sure if it is the best way to handle this, but defining my IOs like this works for me:

val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) }).attempt
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) }).attempt
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) }).attempt

Upvotes: 0

Related Questions