Reputation: 777
I am trying to understand how cats effect Cancelable
works. I have the following minimal app, based on the documentation
import java.util.concurrent.{Executors, ScheduledExecutorService}
import cats.effect._
import cats.implicits._
import scala.concurrent.duration._
object Main extends IOApp {
def delayedTick(d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable { cb =>
val r = new Runnable {
def run() =
cb(Right(()))
}
val f = sc.schedule(r, d.length, d.unit)
// Returning the cancellation token needed to cancel
// the scheduling and release resources early
val mayInterruptIfRunning = false
IO(f.cancel(mayInterruptIfRunning)).void
}
}
override def run(args: List[String]): IO[ExitCode] = {
val scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor()
for {
x <- delayedTick(1.second)(scheduledExecutorService)
_ <- IO(println(s"$x"))
} yield ExitCode.Success
}
}
When I run this:
❯ sbt run
[info] Loading global plugins from /Users/ethan/.sbt/1.0/plugins
[info] Loading settings for project stackoverflow-build from plugins.sbt ...
[info] Loading project definition from /Users/ethan/IdeaProjects/stackoverflow/project
[info] Loading settings for project stackoverflow from build.sbt ...
[info] Set current project to cats-effect-tutorial (in build file:/Users/ethan/IdeaProjects/stackoverflow/)
[info] Compiling 1 Scala source to /Users/ethan/IdeaProjects/stackoverflow/target/scala-2.12/classes ...
[info] running (fork) Main
[info] ()
The program just hangs at this point. I have many questions:
mayInterruptIfRunning = false
? Isn't the whole point of cancellation to interrupt a running task?ScheduledExecutorService
? I did not see examples in the docs.()
(then unexpectedly hangs). What if I wanted to return something else? For example, let's say I wanted to return a string, the result of some long-running computation. How would I extract that value from IO.cancelable
? The difficulty, it seems, is that IO.cancelable
returns the cancelation operation, not the return value of the process to be cancelled.Pardon the long post but this is my build.sbt
:
name := "cats-effect-tutorial"
version := "1.0"
fork := true
scalaVersion := "2.12.8"
libraryDependencies += "org.typelevel" %% "cats-effect" % "1.3.0" withSources() withJavadoc()
scalacOptions ++= Seq(
"-feature",
"-deprecation",
"-unchecked",
"-language:postfixOps",
"-language:higherKinds",
"-Ypartial-unification")
Upvotes: 1
Views: 1516
Reputation: 568
You need explicitly terminate
the executor at the end, as it is not managed by Scala or Cats runtime, it wouldn't exit by itself, that's why your App hands up instead of exit immediately.
mayInterruptIfRunning = false
gracefully terminates a thread if it is running. You can set it as true
to forcely kill it, but it is not recommanded.
You have many way to create a ScheduledExecutorService
, it depends on need. For this case it doesn't matter, but the question 1.
You can return anything from the Cancelable IO by call cb(Right("put your stuff here"))
, the only thing blocks you to retrieve the return A
is when your cancellation works. You wouldn't get anything if you stop it before it gets to the point. Try to return IO(f.cancel(mayInterruptIfRunning)).delayBy(FiniteDuration(2, TimeUnit.SECONDS)).void
, you will get what you expected. Because 2 seconds > 1 second
, your code gets enough time to run before it has been cancelled.
Upvotes: 0
Reputation: 11
you need shutdown the ScheduledExecutorService
, Try this
Resource.make(IO(Executors.newSingleThreadScheduledExecutor))(se => IO(se.shutdown())).use {
se =>
for {
x <- delayedTick(5.second)(se)
_ <- IO(println(s"$x"))
} yield ExitCode.Success
}
Upvotes: 1
Reputation: 777
I was able to find an answer to these questions although there are still some things that I don't understand.
Why does the program hang instead of terminating after 1 second?
For some reason, Executors.newSingleThreadScheduledExecutor()
causes things to hang. To fix the problem, I had to use Executors.newSingleThreadScheduledExecutor(new Thread(_))
. It appears that the only difference is that the first version is equivalent to Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory())
, although nothing in the docs makes it clear why this is the case.
Why do we set
mayInterruptIfRunning = false
? Isn't the whole point of cancellation to interrupt a running task?
I have to admit that I do not understand this entirely. Again, the docs were not especially clarifying on this point. Switching the flag to true
does not seem to change the behavior at all, at least in the case of Ctrl-c
interrupts.
Is this the recommended way to define the ScheduledExecutorService? I did not see examples in the docs.
Clearly not. The way that I came up with was loosely inspired by this snippet from the cats effect source code.
This program waits 1 second, and then returns
()
(then unexpectedly hangs). What if I wanted to return something else? For example, let's say I wanted to return a string, the result of some long-running computation. How would I extract that value fromIO.cancelable
? The difficulty, it seems, is thatIO.cancelable
returns the cancelation operation, not the return value of the process to be cancelled.
The IO.cancellable { ... }
block returns IO[A]
and the callback cb
function has type Either[Throwable, A] => Unit
. Logically this suggests that whatever is fed into the cb
function is what the IO.cancellable expression will returned (wrapped in IO
). So to return the string "hello"
instead of ()
, we rewrite delayedTick
:
def delayedTick(d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[String] = { // Note IO[String] instead of IO[Unit]
implicit val processRunner: JVMProcessRunner[IO] = new JVMProcessRunner
IO.cancelable[String] { cb => // Note IO.cancelable[String] instead of IO[Unit]
val r = new Runnable {
def run() =
cb(Right("hello")) // Note "hello" instead of ()
}
val f: ScheduledFuture[_] = sc.schedule(r, d.length, d.unit)
IO(f.cancel(true))
}
}
Upvotes: 0