ethanabrooks
ethanabrooks

Reputation: 777

Understanding cats effect `Cancelable `

I am trying to understand how cats effect Cancelable works. I have the following minimal app, based on the documentation

import java.util.concurrent.{Executors, ScheduledExecutorService}
import cats.effect._
import cats.implicits._
import scala.concurrent.duration._

object Main extends IOApp {
  def delayedTick(d: FiniteDuration)
                 (implicit sc: ScheduledExecutorService): IO[Unit] = {

    IO.cancelable { cb =>
      val r = new Runnable {
        def run() =
          cb(Right(()))
      }
      val f = sc.schedule(r, d.length, d.unit)

      // Returning the cancellation token needed to cancel
      // the scheduling and release resources early
      val mayInterruptIfRunning = false
      IO(f.cancel(mayInterruptIfRunning)).void
    }
  }

  override def run(args: List[String]): IO[ExitCode] = {
    val scheduledExecutorService =
      Executors.newSingleThreadScheduledExecutor()
    for {
      x <- delayedTick(1.second)(scheduledExecutorService)
      _ <- IO(println(s"$x"))
    } yield ExitCode.Success
  }
}

When I run this:

❯ sbt run
[info] Loading global plugins from /Users/ethan/.sbt/1.0/plugins
[info] Loading settings for project stackoverflow-build from plugins.sbt ...
[info] Loading project definition from /Users/ethan/IdeaProjects/stackoverflow/project
[info] Loading settings for project stackoverflow from build.sbt ...
[info] Set current project to cats-effect-tutorial (in build file:/Users/ethan/IdeaProjects/stackoverflow/)
[info] Compiling 1 Scala source to /Users/ethan/IdeaProjects/stackoverflow/target/scala-2.12/classes ...
[info] running (fork) Main
[info] ()

The program just hangs at this point. I have many questions:

  1. Why does the program hang instead of terminating after 1 second?
  2. Why do we set mayInterruptIfRunning = false? Isn't the whole point of cancellation to interrupt a running task?
  3. Is this the recommended way to define the ScheduledExecutorService? I did not see examples in the docs.
  4. This program waits 1 second, and then returns () (then unexpectedly hangs). What if I wanted to return something else? For example, let's say I wanted to return a string, the result of some long-running computation. How would I extract that value from IO.cancelable? The difficulty, it seems, is that IO.cancelable returns the cancelation operation, not the return value of the process to be cancelled.

Pardon the long post but this is my build.sbt:

name := "cats-effect-tutorial"

version := "1.0"

fork := true

scalaVersion := "2.12.8"

libraryDependencies += "org.typelevel" %% "cats-effect" % "1.3.0" withSources() withJavadoc()

scalacOptions ++= Seq(
  "-feature",
  "-deprecation",
  "-unchecked",
  "-language:postfixOps",
  "-language:higherKinds",
  "-Ypartial-unification")

Upvotes: 1

Views: 1516

Answers (3)

user3593261
user3593261

Reputation: 568

  1. You need explicitly terminate the executor at the end, as it is not managed by Scala or Cats runtime, it wouldn't exit by itself, that's why your App hands up instead of exit immediately.

  2. mayInterruptIfRunning = false gracefully terminates a thread if it is running. You can set it as true to forcely kill it, but it is not recommanded.

  3. You have many way to create a ScheduledExecutorService, it depends on need. For this case it doesn't matter, but the question 1.

  4. You can return anything from the Cancelable IO by call cb(Right("put your stuff here")), the only thing blocks you to retrieve the return A is when your cancellation works. You wouldn't get anything if you stop it before it gets to the point. Try to return IO(f.cancel(mayInterruptIfRunning)).delayBy(FiniteDuration(2, TimeUnit.SECONDS)).void, you will get what you expected. Because 2 seconds > 1 second, your code gets enough time to run before it has been cancelled.

Upvotes: 0

thinkiny
thinkiny

Reputation: 11

you need shutdown the ScheduledExecutorService, Try this

Resource.make(IO(Executors.newSingleThreadScheduledExecutor))(se => IO(se.shutdown())).use {
      se =>
        for {
          x <- delayedTick(5.second)(se)
          _ <- IO(println(s"$x"))
        } yield ExitCode.Success
    }

Upvotes: 1

ethanabrooks
ethanabrooks

Reputation: 777

I was able to find an answer to these questions although there are still some things that I don't understand.

Why does the program hang instead of terminating after 1 second?

For some reason, Executors.newSingleThreadScheduledExecutor() causes things to hang. To fix the problem, I had to use Executors.newSingleThreadScheduledExecutor(new Thread(_)). It appears that the only difference is that the first version is equivalent to Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), although nothing in the docs makes it clear why this is the case.

Why do we set mayInterruptIfRunning = false? Isn't the whole point of cancellation to interrupt a running task?

I have to admit that I do not understand this entirely. Again, the docs were not especially clarifying on this point. Switching the flag to true does not seem to change the behavior at all, at least in the case of Ctrl-c interrupts.

Is this the recommended way to define the ScheduledExecutorService? I did not see examples in the docs.

Clearly not. The way that I came up with was loosely inspired by this snippet from the cats effect source code.

This program waits 1 second, and then returns () (then unexpectedly hangs). What if I wanted to return something else? For example, let's say I wanted to return a string, the result of some long-running computation. How would I extract that value from IO.cancelable? The difficulty, it seems, is that IO.cancelable returns the cancelation operation, not the return value of the process to be cancelled.

The IO.cancellable { ... } block returns IO[A] and the callback cb function has type Either[Throwable, A] => Unit. Logically this suggests that whatever is fed into the cb function is what the IO.cancellable expression will returned (wrapped in IO). So to return the string "hello" instead of (), we rewrite delayedTick:

  def delayedTick(d: FiniteDuration)
                 (implicit sc: ScheduledExecutorService): IO[String] = { // Note IO[String] instead of IO[Unit]

    implicit val processRunner: JVMProcessRunner[IO] = new JVMProcessRunner
    IO.cancelable[String] { cb => // Note IO.cancelable[String] instead of IO[Unit]
      val r = new Runnable {
        def run() =
          cb(Right("hello")) // Note "hello" instead of ()
      }
      val f: ScheduledFuture[_] = sc.schedule(r, d.length, d.unit)
      IO(f.cancel(true))
    }
  }

Upvotes: 0

Related Questions