Georg Heiler
Georg Heiler

Reputation: 17676

Monix task handle failure for list of futures

How can I handle the failure during the asynchronous execution of the task? I.e. at least print the stack trace and shut down. The code below seems to wait forever for input > 5

val things = Range(1, 40)
  implicit val scheduler = monix.execution.Scheduler.global
  def t(i:Int) = Task.eval {
      Try{
        Thread.sleep(1000)
        val result = i + 1
        if(result > 5){
          throw new Exception("asdf")
        }
        // i.e. write to file, that's why unit is returned
        println(result) // Effect
        "Result"
      }
    }
    val futures = things.map(e=> t(e))
  futures.foreach(_.runToFuture)

edit

trying:

futures.foreach(_.runToFuture.onComplete {
    case Success(value) =>
      println(value)
    case Failure(ex) =>
      System.err.println(ex)
      System.exit(1)
  })

will not stop the computation. How can I log the stack trace and cancel the ongoing computations and stop?

Upvotes: 1

Views: 638

Answers (2)

atl
atl

Reputation: 326

A more idiomatic approach would be to use Observable instead of Task since it is dealing with list of data (I'm assuming that's the use case since it is shown in the question).

 val obs = Observable
  .fromIterable(Range(1, 40))
  .mapEval(i =>
    if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
    else Task.delay(println(i)) // Or write to file in your case
  )
  .completedL
  .runToFuture


obs
  .recover {
    case NonFatal(e) => println("Error")
  }

Alternatively, you can also signal the error with Either which leads to better type safety, since you'll need to handle the Either result.

val obs = Observable
  .fromIterable(Range(1, 40))
  .mapEval(i =>
    if (i + 1 > 5) Task.pure(Left("Error"))
    else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
  )
  .takeWhileInclusive(_.isRight) // will also emit the failing result
  .lastL
  .runToFuture


obs.map {
  case Left(err) => println("There's an error")
  case _ => println("Completed successfully")
}

Upvotes: 1

muhuk
muhuk

Reputation: 16085

This problems has 2 parts:

  • Making the tasks cancellable.
  • Cancelling the siblings when one task fails.

Making the task cancellable

Monix has BooleanCancelable which would allow you to set the result of isCancelled to true when the cancel is called.

cancel also needs to call Thread.interrupt to wake it up when Thread.sleep is running. Otherwise the sleep will run through its course. However this will throw InterruptedException within your task. That needs to be handled.

Cancelling the siblings

There is CompositeCancelable. It seems like the use case for CompositeCancellable it for calling cancel from the parent task. So once the CompositeCancellable is built (ie. all tasks are constructred):

  • Either a reference of this must be made available to each task, so the failing task can call cancel on it. (Note that this is a sort of circular reference, better avoided)
  • Or parent task (or code) is made aware when a sub-task fails and it calls cancel. (This would avoid circular reference)

Another way to notify sibling tasks is to use an AtomicBoolean and frequently check it (sleep 10 milliseconds instead of 1000). When one task fails it would then set this boolean so other tasks can stop their execution. This of course does not involve Cancellable. (And it's kind of a hack, better to use monix scheduler)

Note

Is it a good idea to call Thread.sleep within a Task? I think this would prevent another task to use that thread. I think using scheduler to add delays and composing those sub-tasks is the way to utilize the thread-pool most effectively.

Upvotes: 0

Related Questions