Reputation: 17676
How can I handle the failure during the asynchronous execution of the task? I.e. at least print the stack trace and shut down. The code below seems to wait forever for input > 5
val things = Range(1, 40)
implicit val scheduler = monix.execution.Scheduler.global
def t(i:Int) = Task.eval {
Try{
Thread.sleep(1000)
val result = i + 1
if(result > 5){
throw new Exception("asdf")
}
// i.e. write to file, that's why unit is returned
println(result) // Effect
"Result"
}
}
val futures = things.map(e=> t(e))
futures.foreach(_.runToFuture)
trying:
futures.foreach(_.runToFuture.onComplete {
case Success(value) =>
println(value)
case Failure(ex) =>
System.err.println(ex)
System.exit(1)
})
will not stop the computation. How can I log the stack trace and cancel the ongoing computations and stop?
Upvotes: 1
Views: 638
Reputation: 326
A more idiomatic approach would be to use Observable
instead of Task
since it is dealing with list of data (I'm assuming that's the use case since it is shown in the question).
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
else Task.delay(println(i)) // Or write to file in your case
)
.completedL
.runToFuture
obs
.recover {
case NonFatal(e) => println("Error")
}
Alternatively, you can also signal the error with Either
which leads to better type safety, since you'll need to handle the Either
result.
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.pure(Left("Error"))
else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
)
.takeWhileInclusive(_.isRight) // will also emit the failing result
.lastL
.runToFuture
obs.map {
case Left(err) => println("There's an error")
case _ => println("Completed successfully")
}
Upvotes: 1
Reputation: 16085
This problems has 2 parts:
Monix has BooleanCancelable which would allow you to set the result of isCancelled
to true
when the cancel
is called.
cancel
also needs to call Thread.interrupt
to wake it up when Thread.sleep
is running. Otherwise the sleep
will run through its course. However this will throw InterruptedException within your task. That needs to be handled.
There is CompositeCancelable. It seems like the use case for CompositeCancellable
it for calling cancel
from the parent task. So once the CompositeCancellable
is built (ie. all tasks are constructred):
cancel
on it. (Note that this is a sort of circular reference, better avoided)cancel
. (This would avoid circular reference)Another way to notify sibling tasks is to use an AtomicBoolean and frequently check it (sleep 10 milliseconds instead of 1000). When one task fails it would then set this boolean so other tasks can stop their execution. This of course does not involve Cancellable
. (And it's kind of a hack, better to use monix scheduler)
Is it a good idea to call Thread.sleep
within a Task
? I think this would prevent another task to use that thread. I think using scheduler to add delays and composing those sub-tasks is the way to utilize the thread-pool most effectively.
Upvotes: 0