Reputation: 333
I am trying to comprehend Task scheduling principles in Monix. The following code (source: https://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3) produces only '1's, as expected.
val s1: Scheduler = Scheduler(
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()),
ExecutionModel.SynchronousExecution)
def repeat(id: Int): Task[Unit] =
Task(println(s"$id ${Thread.currentThread().getName}")) >> repeat(id)
val prog: Task[(Unit, Unit)] = (repeat(1), repeat(2)).parTupled
prog.runToFuture(s1)
// Output:
// 1 pool-1-thread-1
// 1 pool-1-thread-1
// 1 pool-1-thread-1
// ...
When we add Task.sleep
to the repeat
method
def repeat(id: Int): Task[Unit] =
Task(println(s"$id ${Thread.currentThread().getName}")) >>
Task.sleep(1.millis) >> repeat(id)
the output changes to
// Output
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// ...
Both tasks are now executed concurently on a single thread! Nice :) Some cooperative yielding has kicked in. What happenend here exactly? Thanks :)
EDIT: same happens with Task.shift
instead of Task.sleep
.
Upvotes: 3
Views: 699
Reputation: 326
To expand on Markus's answer.
As a mental model (for illustration purpose), you can imagine the thread pool like a stack. Since, you only have one executor thread pool, it'll try to run repeat1
first and then repeat2
.
Internally, everything is just a giant FlatMap
. The run loop will schedule all the tasks based on the execution model.
What happens is, sleep
schedules a runnable to the thread pool. It pushes the runnable (repeat1
) to the top of the stack, hence giving the chance for repeat2
to run. The same thing will happen with repeat2
.
Note that, by default Monix's execution model will do an async boundary for every 1024 flatmap.
Upvotes: 0
Reputation: 3238
I'm not sure if that's the answer you're looking for, but here it goes:
Allthough naming suggests otherwise, Task.sleep
cannot be compared to more conventional methods like Thread.sleep
.
Task.sleep
does not actually run on a thread, but instead simply instructs the scheduler to run a callback after the elapsed time.
Here's a little code snippet from monix/TaskSleep.scala
for comparison:
[...]
implicit val s = ctx.scheduler
val c = TaskConnectionRef()
ctx.connection.push(c.cancel)
c := ctx.scheduler.scheduleOnce(
timespan.length,
timespan.unit,
new SleepRunnable(ctx, cb)
)
[...]
private final class SleepRunnable(ctx: Context, cb: Callback[Throwable, Unit]) extends Runnable {
def run(): Unit = {
ctx.connection.pop()
// We had an async boundary, as we must reset the frame
ctx.frameRef.reset()
cb.onSuccess(())
}
}
[...]
During the period before the callback (here: cb
) is executed, your single-threaded scheduler (here: ctx.scheduler
) can simply use his thread for whatever computation is queued next.
This also explains why this approach is preferable, as we don't block threads during the sleep intervals - wasting less computation cycles.
Hope this helps.
Upvotes: 1