ZIO: Why RejectedExecutionException are being thrown?

I'm getting RejectedExecutionException while running following code:

override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] = {
    val test = for {
      start  <- ZIO effect Instant.now.toEpochMilli
      loop   = for {
        f <- effectBlocking{Thread sleep 1000}
      } yield ()
      allFib <- loop.fork replicateM 2000
      _      <- Fiber joinAll allFib
      end    <- ZIO effect Instant.now.toEpochMilli
      _      <- putStrLn(s"Total time: ${end-start}")
    } yield ()

    test
      .catchAll(ZIO succeed _.getMessage)
      .map(_ => ExitCode.success)
  }

What am I doing wrong? When I execute the code below (I think it's more or less the same thing I'm trying to do on ZIO), using pure Scala and Futures, everything works fine without any exception.

  val tasks = new ListBuffer[Future[Unit]]
  
  for(_ <- 0 to 2000) {
    tasks += Future {
      Thread.sleep(1000)
    }
  }

  Await.result(Future sequence tasks, Duration.Inf)

With ZIO, I also tried to pass a dedicated ExecutorService, but it didn't work either.

Upvotes: 0

Views: 674

Answers (1)

Oleg Pyzhcov
Oleg Pyzhcov

Reputation: 7353

Modern effect types promote use of two thread pools in the app, one for thread-blocking stuff (recommended size is unbounded, but as we learned, ZIO limits the size to 1000), another for computation (ZIO is 2*CPU hyperthreads; in cats-effect it's min(2, CPU hyperthreads)) and things that may block but not for much that it's worth caring about, like println.

In ZIO, effectBlocking is how you tell the execution to switch to that other pool. This is important to do for non-insignificant blocking, because NOT doing so starves the compute pool, on which the rest of your app is running.

That's why ZIO.effect works - it uses the compute pool. For me your code would finish in ~250s (4 cores means 8 threads, and we have 2000 sleeping for a second).


Now, I'm going to assume that you have replaced wrong threadpool with a cached one, since this works:

object ZioBlockingTest extends zio.App {

  val cachedBlocker = ZLayer.succeed(new Blocking.Service {
    override def blockingExecutor: Executor =
      Executor.fromExecutionContext(Int.MaxValue)(
        ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
      )
  })


  override def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = {
    val test = for {
      start  <- ZIO effect Instant.now.toEpochMilli
      loop   = for {
        f <- effectBlocking{Thread sleep 1000}
      } yield ()
      allFib <- loop.fork replicateM 2000
      _      <- Fiber joinAll allFib
      end    <- ZIO effect Instant.now.toEpochMilli
      _      <- putStrLn(s"Total time: ${end-start}")
    } yield ()

    test
      .catchAll(ZIO succeed _.getMessage)
      .map(_ => ExitCode.success)
  }.provideSomeLayer[ZEnv](cachedBlocker) // <-- override the pool used for effectBlocking
}

This finishes in slightly under 2s for me, due to overhead of spinning up and joining 2000 actual OS threads.


P.S. you are right about thread pool having "mailboxes" but the mailboxes themselves can be different. Computation pools usually use java.util.concurrent.LinkedBlockedQueue with maximum capacity (Int.MaxValue) so stuff can always be postponed for when there's a free thread. Blocking pools use j.u.c.SynchronousQueue which basically has a capacity of 0, so it's "start immediately or die". It can probably be considered a bug in ZIO that it uses both limited thread pool and SynchronousQueue for blocking tasks.


P.P.S. Futures are using some thread pool too, but you've probably imported ExecutionContext.Implicits.global. It has a bit of extra magic that you can use to tell the pool "I'm blocking here, maybe add some threads":

  for(_ <- 0 to 2000) {
    tasks += Future {
      blocking { Thread.sleep(1000) }
    }
  }

The exact limits of global EC are configurable with VM properties; default for additional threads in case of blocking is 256. That makes example code finish in few seconds (still slower than using ZIO with a proper cached pool).

Lazy effect types don't rely on this sort of thing since if you use two pools right, it's actually more performant; and it's less hard to use different pools since the aren't passed as implicit everywhere.

Upvotes: 2

Related Questions