Reputation: 921
I would like to suspend a Kotlin coroutine until a method is called from outside, just like the old Java object.wait()
and object.notify()
methods. How do I do that?
Here: Correctly implementing wait and notify in Kotlin is an answer how to implement this with Kotlin threads (blocking). And here: Suspend coroutine until condition is true is an answer how to do this with CompleteableDeferred
but I do not want to have to create a new instance of CompleteableDeferred
every time.
I am doing this currently:
var nextIndex = 0
fun handleNext(): Boolean {
if (nextIndex < apps.size) {
//Do the actual work on apps[nextIndex]
nextIndex++
}
//only execute again if nextIndex is a valid index
return nextIndex < apps.size
}
handleNext()
// The returned function will be called multiple times, which I would like to replace with something like notify()
return ::handleNext
Upvotes: 31
Views: 17405
Reputation: 51
Using Mutex & Shared Flow it is possible. I made an example where only one task with the same id is executing at a time. Ids are stored in the set. If id in the set, then task with this id is executing.
private class SharedResourceCoroutinesSync {
private val ids: MutableSet<String> = mutableSetOf()
// passes only one coroutine at a time
private val mutex = Mutex()
// Shared flow gives all waiters its values
private val signal = MutableSharedFlow<Unit>(0,1,BufferOverflow.DROP_OLDEST)
// only one task with the same id can be executed at a time
suspend fun submitTaskById(
id: String,
task: suspend ()->Unit
): Job = coroutineScope {
while (true){
// analogue of entering java synchronized block
mutex.lock() // only one coroutine at once can pass, others suspends
if (id !in ids){ // check if this id is busy
ids += id // if not busy then make it busy
// analogue of leaving java synchronized block
mutex.unlock() // allow other coroutines to check
break // go out from loop to do task
}
// if this id is busy, we need to wait, and need to be notified
// subscribe on shared flow events, then unlock mutex
// subscribe BEFORE unlock, so other coroutines can't push notifyAll events before we can receive them
signal.onSubscription {
// analogue of leaving java synchronized block
mutex.unlock()
}
.take(1)
.collect() // analogue of java blocking wait()
}
// returns Job
launch {
task() // executes some task
mutex.withLock { // analogue of java synchronized block
ids -= id // release this id
signal.emit(Unit) // analogue of java notifyAll()
}
}
}
}
Upvotes: 0
Reputation: 3591
I suggest using a CompletableJob
for that.
My use case:
suspend fun onLoad() {
var job1: CompletableJob? = Job()
var job2: CompletableJob? = Job()
lifecycleScope.launch {
someList.collect {
doSomething(it)
job1?.complete()
}
}
lifecycleScope.launch {
otherList.collect {
doSomethingElse(it)
job2?.complete()
}
}
joinAll(job1!!, job2!!) // suspends until both jobs are done
job1 = null
job2 = null
// Do something one time
}
Upvotes: 3
Reputation: 170735
Channels can be used for this (though they are more general):
When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.
So create
val channel = Channel<Unit>(0)
And use channel.receive()
for object.wait()
, and channel.offer(Unit)
for object.notify()
(or send
if you want to wait until the other coroutine receive
s).
For notifyAll
, you can use BroadcastChannel
instead.
You can of course easily encapsulate it:
inline class Waiter(private val channel: Channel<Unit> = Channel<Unit>(0)) {
suspend fun doWait() { channel.receive() }
fun doNotify() { channel.offer(Unit) }
}
Upvotes: 35
Reputation: 4992
It is possible to use the basic suspendCoroutine{..}
function for that, e.g.
class SuspendWait() {
private lateinit var myCont: Continuation<Unit>
suspend fun sleepAndWait() = suspendCoroutine<Unit>{ cont ->
myCont = cont
}
fun resume() {
val cont = myCont
myCont = null
cont.resume(Unit)
}
}
It is clear, the code have issues, e.g. myCont
field is not synchonized, it is expected that sleepAndWait
is called before the resume
and so on, hope the idea is clear now.
There is another solution with the Mutex
class from the kotlinx.coroutines library.
class SuspendWait2 {
private val mutex = Mutex(locaked = true)
suspend fun sleepAndWait() = mutex.withLock{}
fun resume() {
mutex.unlock()
}
}
Upvotes: 13