Kamil Kloch
Kamil Kloch

Reputation: 333

Monix Task.sleep and single thread execution

I am trying to comprehend Task scheduling principles in Monix. The following code (source: https://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3) produces only '1's, as expected.

  val s1: Scheduler = Scheduler(
    ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()),
    ExecutionModel.SynchronousExecution)

  def repeat(id: Int): Task[Unit] =
    Task(println(s"$id ${Thread.currentThread().getName}")) >> repeat(id)

  val prog: Task[(Unit, Unit)] = (repeat(1), repeat(2)).parTupled

  prog.runToFuture(s1)

  // Output:
  // 1 pool-1-thread-1
  // 1 pool-1-thread-1
  // 1 pool-1-thread-1
  // ...

When we add Task.sleep to the repeat method

  def repeat(id: Int): Task[Unit] =
    Task(println(s"$id ${Thread.currentThread().getName}")) >>
      Task.sleep(1.millis) >> repeat(id)

the output changes to

// Output
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// ...

Both tasks are now executed concurently on a single thread! Nice :) Some cooperative yielding has kicked in. What happenend here exactly? Thanks :)

EDIT: same happens with Task.shift instead of Task.sleep.

Upvotes: 3

Views: 699

Answers (2)

atl
atl

Reputation: 326

To expand on Markus's answer.

As a mental model (for illustration purpose), you can imagine the thread pool like a stack. Since, you only have one executor thread pool, it'll try to run repeat1 first and then repeat2.

Internally, everything is just a giant FlatMap. The run loop will schedule all the tasks based on the execution model.

What happens is, sleep schedules a runnable to the thread pool. It pushes the runnable (repeat1) to the top of the stack, hence giving the chance for repeat2 to run. The same thing will happen with repeat2.

Note that, by default Monix's execution model will do an async boundary for every 1024 flatmap.

Upvotes: 0

Markus Appel
Markus Appel

Reputation: 3238

I'm not sure if that's the answer you're looking for, but here it goes:

Allthough naming suggests otherwise, Task.sleep cannot be compared to more conventional methods like Thread.sleep.

Task.sleep does not actually run on a thread, but instead simply instructs the scheduler to run a callback after the elapsed time.

Here's a little code snippet from monix/TaskSleep.scala for comparison:

[...]

implicit val s = ctx.scheduler
val c = TaskConnectionRef()
ctx.connection.push(c.cancel)

c := ctx.scheduler.scheduleOnce(
  timespan.length,
  timespan.unit,
  new SleepRunnable(ctx, cb)
)

[...]

private final class SleepRunnable(ctx: Context, cb: Callback[Throwable, Unit]) extends Runnable {

  def run(): Unit = {
    ctx.connection.pop()
    // We had an async boundary, as we must reset the frame
    ctx.frameRef.reset()
    cb.onSuccess(())
  }
}

[...]

During the period before the callback (here: cb) is executed, your single-threaded scheduler (here: ctx.scheduler) can simply use his thread for whatever computation is queued next.

This also explains why this approach is preferable, as we don't block threads during the sleep intervals - wasting less computation cycles.

Hope this helps.

Upvotes: 1

Related Questions