Reputation: 800
I am new to Scala and I am practicing on the Futures lib by creating some retry schemes. Doing so I got the following piece of code:
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object Retries extends App {
var retries = 0
def resetRetries(): Unit = retries = 0
def calc() = if (retries > 3) 10 else {
retries += 1
println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
throw new IllegalArgumentException("This failed")
}
def fCalc(): Future[Int] = Future(calc())
resetRetries()
val ff = fCalc() // 0 - should fail
.fallbackTo(fCalc()) // 1 - should fail
.fallbackTo(fCalc()) // 2 - should fail
.fallbackTo(fCalc()) // 3 - should fail
.fallbackTo(fCalc()) // 4 - should be a success
Await.ready(ff, 10.second)
println(ff.isCompleted)
println(ff.value)
}
Every time I run this code I am getting different results. Samples of the results i am getting are the following
Output 1
I am thread 12 This is going to fail. Retry count 1
I am thread 14 This is going to fail. Retry count 3
I am thread 13 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))
Output 2
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 13 This is going to fail. Retry count 3
I am thread 14 This is going to fail. Retry count 4
true
Some(Success(10))
Ouput 3
I am thread 12 This is going to fail. Retry count 1
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 12 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))
It's not always the case that the results would be alternating between success and failure. There might be more than a couple failed runs until a successful one appears.
To my understanding, there should be only 4 logs of the "I am thread x This is going to fail. Retry count x" and these should be the following:
I am thread a This is going to fail. Retry count 1
I am thread b This is going to fail. Retry count 2
I am thread c This is going to fail. Retry count 3
I am thread d This is going to fail. Retry count 4
Not necessarily in this order - since I do not know how the Scala threading model works exactly - but you get my point. Nevertheless, I am getting this nondeterministic output I cannot come to grips with.So... my question is: Where does this non deterministic output come from?
I would like to mention that the following retry mechanism yields consistently the same results:
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object Retries extends App {
var retries = 0
def resetRetries(): Unit = retries = 0
def calc() = if (retries > 3) 10 else {
retries += 1
println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
throw new IllegalArgumentException("This failed")
}
def retry[T](op: => T)(retries: Int): Future[T] = Future(op) recoverWith { case _ if retries > 0 => retry(op)(retries - 1) }
resetRetries()
val retriableFuture: Future[Future[Int]] = retry(calc())(5)
Await.ready(retriableFuture, 10 second)
println(retriableFuture.isCompleted)
println(retriableFuture.value)
}
Output
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Success(10))
While if I reduce the number of retries (retry(calc())(3)
), the result is a failed future as expected
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))
Upvotes: 1
Views: 423
Reputation: 23788
Although technically @Tim is correct I don't think he really answers this question.
I believe the real source of your confusion is your misunderstanding of what the constructions:
f.fallbackTo(Future(calc()))
does. And how it is different from
f.recoverWith({ case _ => Future(calc())})
There are two important distinctions:
In the fallbackTo
case the Future(calc())
is created immediately and thus (almost) immediately starts execution of the calc()
. Thus the original future and the fallback future are run concurrently. In the case of the recoverWith
the fallback future is created only after the failure of the original future. This difference affects the logging order. Also this means that the access to the var retries
is concurrent and thus you might see the case when all threads actually fail because some updates to retries
are lost.
Another tricky point is that fallbackTo
is documented as (highlighting is mine)
Creates a new future which holds the result of this future if it was completed successfully, or, if not, the result of the that future if that is completed successfully. If both futures are failed, the resulting future holds the throwable object of the first future.
This difference does not really affect your example because the exception you throw in all failed attempts is the same but it might have affected the result if they were different. For example if you modify your code to:
def calc(attempt: Int) = if (retries > 3) 10 else {
retries += 1
println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
throw new IllegalArgumentException(s"This failed $attempt")
}
def fCalc(attempt: Int): Future[Int] = Future(calc(attempt))
val ff = fCalc(1) // 0 - should fail
.fallbackTo(fCalc(2)) // 1 - should fail
.fallbackTo(fCalc(3)) // 2 - should fail
.fallbackTo(fCalc(4)) // 3 - should fail
.fallbackTo(fCalc(5)) // 4 - should be a success
then you should get either of those two results
Some(Failure(java.lang.IllegalArgumentException: This failed 1))
Some(Success(10))
and never any other "failed" value.
Note that here I explicitly pass the attempt
to not hit the race condition on retries
.
Answer to more comments (Jan 28)
The reason I explicitly pass attempt
in my previous example is that it is the simplest way to ensure that the IllegalArgumentException
created by the logically first calc
will get 1
as its value under all (even not very realistic) thread schedules.
If you just interested in having all logs to have different values there is a much easier way: use a local variable!
def calc() = {
val retries = atomicRetries.getAndIncrement()
if (retries > 3) 10
else {
println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
throw new IllegalArgumentException(s"This failed $retries")
}
}
In this way you avoid classical TOCTOU problem.
Upvotes: 4
Reputation: 800
This is what eventually worked for me:
(The following code for calc()
method adequately tackles the issues regarding logging duplication and futures' non deterministic results)
var time = 0
var resetTries = time = 0
def calc() = this.synchronized {
if (time > 3) 10 else {
time += 1
println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $time") // For debugging purposes
throw new IllegalStateException(("not yet"))
}
}
No AtomicInteger
required - makes things even more complicated in my opinion. A synchronised
wrapper is what is needed.
I must highlight the fact that this is just for demonstration purposes and using such a design in a production code might not be the best idea (blocking calls to calc
method). One should use the recoverWith
implementation instead.
Thanks to @SergGr , @Tim and @MichalPolitowksi for their help
Upvotes: 0
Reputation: 27356
This is not a Scala problem but a more general multi-threading problem with the value retries
. You have multiple threads reading and writing this value without any synchronisation, so you can't predict when each thread will run or what value it will see.
It looks like the specific problem is that you are testing retries
and then updating it later. It is possible that all four threads test the value before any of them updates it. In this case they would all see 0
and throw an error.
The solution is to turn retries
into an AtomicInteger
and use getAndIncrement
. This will atomically retrieve the value and increment it, so each thread will see the appropriate value.
Update following comments: The other answer has explained why it is that multiple threads are being started at the same time so I won't repeat it here. With multiple threads running in parallel the order of the logging is always going to be non-deterministic.
Upvotes: 2