Reputation: 7
I want to process some data in an IO thread and need to access the final data after the processing is complete. This is my Process
sample code:
private class Process {
private val scope = CoroutineScope(Dispatchers.IO)
private val value: AtomicInteger = AtomicInteger(0)
private val mutex = Mutex()
/**
* Do background task to change value.
*/
fun doIt() {
scope.launch {
println("SCOPE - start at ${System.currentTimeMillis()}")
mutex.withLock {
println("Process - doIt start delay ${System.currentTimeMillis()}")
delay(10)
value.incrementAndGet()
println("Process - value increase to $value")
}
println("SCOPE - end at ${System.currentTimeMillis()}")
}
}
/**
* Get value sync.
*/
fun getValue(): Int {
return runBlocking(scope.coroutineContext) {
mutex.withLock {
println("Process - getValue ${System.currentTimeMillis()}")
value.get()
}
}
}
/**
* Reset value.
*/
fun reset() {
runBlocking(scope.coroutineContext) {
mutex.withLock {
value.set(0)
println("Process - value reset to ${value.get()}")
}
}
}
}
And the Process
usage:
fun main() {
val process = Process()
repeat(100) {
println("-----------------------------")
println("Start - ${System.currentTimeMillis()}")
process.reset()
process.doIt()
val result = run {
val count = process.getValue()
if (count == 1) {
println("Count - $count ${System.currentTimeMillis()}")
true
} else {
println("Mutex failed in $it with $count")
false
}
}
if (!result) return@main
}
}
My expected result log is 100 snippets like this:
-----------------------------
Start - 1645717971970
Process - value reset to 0
SCOPE - start at 1645717972011
Process - doIt start delay 1645717972011
Process - value increase to 1
SCOPE - end at 1645717972034
Process - getValue 1645717972034
Count - 1 1645717972035
But the test is always interrupted when it reaches the third or fourth loop:
-----------------------------
Start - 1645717971970
Process - value reset to 0
SCOPE - start at 1645717972011
Process - doIt start delay 1645717972011
Process - value increase to 1
SCOPE - end at 1645717972034
Process - getValue 1645717972034
Count - 1 1645717972035
-----------------------------
Start - 1645717972035
Process - value reset to 0
SCOPE - start at 1645717972036
Process - doIt start delay 1645717972036
Process - value increase to 1
SCOPE - end at 1645717972049
Process - getValue 1645717972049
Count - 1 1645717972050
-----------------------------
Start - 1645717972050
Process - value reset to 0
SCOPE - start at 1645717972050
Process - getValue 1645717972051
Process - doIt start delay 1645717972051
Mutex failed in 2 with 0
Can anyone tell me how I should synchronize to get the results of asynchronous code that has already started executing under this case?
Upvotes: 0
Views: 325
Reputation: 9944
Have you considered using a Deferred
result in place of your AtomicInteger
?
var result: Deferred<Int> = CompletableDeferred(0)
fun doIt() {
result = async { /* get new value */ }
}
fun getValue() = runBlocking { result.await() }
fun reset() {
result = CompletableDeferred(0)
}
You might need to introduce some more complexity if you need to handle cancellation of the previous job each time doIt
is called.
Upvotes: 2