Alex Liu
Alex Liu

Reputation: 7

How to gracefully access the results of asynchronous functions in Kotlin Coroutine with Mutex synchronously?

I want to process some data in an IO thread and need to access the final data after the processing is complete. This is my Process sample code:

private class Process {
    private val scope = CoroutineScope(Dispatchers.IO)
    private val value: AtomicInteger = AtomicInteger(0)
    private val mutex = Mutex()

    /**
     * Do background task to change value.
     */
    fun doIt() {
        scope.launch {
            println("SCOPE - start at ${System.currentTimeMillis()}")
            mutex.withLock {
                println("Process - doIt start delay ${System.currentTimeMillis()}")
                delay(10)
                value.incrementAndGet()
                println("Process - value increase to $value")
            }
            println("SCOPE - end at ${System.currentTimeMillis()}")
        }
    }

    /**
     * Get value sync.
     */
    fun getValue(): Int {
        return runBlocking(scope.coroutineContext) {
            mutex.withLock {
                println("Process - getValue ${System.currentTimeMillis()}")
                value.get()
            }
        }
    }

    /**
     * Reset value.
     */
    fun reset() {
        runBlocking(scope.coroutineContext) {
            mutex.withLock {
                value.set(0)
                println("Process - value reset to ${value.get()}")
            }
        }
    }
}

And the Process usage:

fun main() {
    val process = Process()
    repeat(100) {
        println("-----------------------------")
        println("Start - ${System.currentTimeMillis()}")
        process.reset()
        process.doIt()
        val result = run {
            val count = process.getValue()
            if (count == 1) {
                println("Count - $count ${System.currentTimeMillis()}")
                true
            } else {
                println("Mutex failed in $it with $count")
                false
            }
        }
        if (!result) return@main
    }
}

My expected result log is 100 snippets like this:

-----------------------------
Start - 1645717971970
Process - value reset to 0
SCOPE - start at 1645717972011
Process - doIt start delay 1645717972011
Process - value increase to 1
SCOPE - end at 1645717972034
Process - getValue 1645717972034
Count - 1 1645717972035

But the test is always interrupted when it reaches the third or fourth loop:

-----------------------------
Start - 1645717971970
Process - value reset to 0
SCOPE - start at 1645717972011
Process - doIt start delay 1645717972011
Process - value increase to 1
SCOPE - end at 1645717972034
Process - getValue 1645717972034
Count - 1 1645717972035
-----------------------------
Start - 1645717972035
Process - value reset to 0
SCOPE - start at 1645717972036
Process - doIt start delay 1645717972036
Process - value increase to 1
SCOPE - end at 1645717972049
Process - getValue 1645717972049
Count - 1 1645717972050
-----------------------------
Start - 1645717972050
Process - value reset to 0
SCOPE - start at 1645717972050
Process - getValue 1645717972051
Process - doIt start delay 1645717972051
Mutex failed in 2 with 0

Can anyone tell me how I should synchronize to get the results of asynchronous code that has already started executing under this case?

Upvotes: 0

Views: 325

Answers (1)

Sam
Sam

Reputation: 9944

Have you considered using a Deferred result in place of your AtomicInteger?

var result: Deferred<Int> = CompletableDeferred(0)

fun doIt() {
    result = async { /* get new value */ }
}

fun getValue() = runBlocking { result.await() }

fun reset() {
    result = CompletableDeferred(0)
}

You might need to introduce some more complexity if you need to handle cancellation of the previous job each time doIt is called.

Upvotes: 2

Related Questions