Nemo
Nemo

Reputation: 921

How to suspend kotlin coroutine until notified

I would like to suspend a Kotlin coroutine until a method is called from outside, just like the old Java object.wait() and object.notify() methods. How do I do that?

Here: Correctly implementing wait and notify in Kotlin is an answer how to implement this with Kotlin threads (blocking). And here: Suspend coroutine until condition is true is an answer how to do this with CompleteableDeferred but I do not want to have to create a new instance of CompleteableDeferred every time.

I am doing this currently:

    var nextIndex = 0

    fun handleNext(): Boolean {
        if (nextIndex < apps.size) {
            //Do the actual work on apps[nextIndex]
            nextIndex++
        }
        //only execute again if nextIndex is a valid index
        return nextIndex < apps.size
    }

    handleNext()

    // The returned function will be called multiple times, which I would like to replace with something like notify()
    return ::handleNext

Source

Upvotes: 31

Views: 17405

Answers (4)

rrain
rrain

Reputation: 51

Using Mutex & Shared Flow it is possible. I made an example where only one task with the same id is executing at a time. Ids are stored in the set. If id in the set, then task with this id is executing.

private class SharedResourceCoroutinesSync {

    private val ids: MutableSet<String> = mutableSetOf()

    // passes only one coroutine at a time
    private val mutex = Mutex()
    // Shared flow gives all waiters its values
    private val signal = MutableSharedFlow<Unit>(0,1,BufferOverflow.DROP_OLDEST)

    // only one task with the same id can be executed at a time
    suspend fun submitTaskById(
        id: String,
        task: suspend ()->Unit
    ): Job = coroutineScope {

        while (true){

            // analogue of entering java synchronized block
            mutex.lock() // only one coroutine at once can pass, others suspends

            if (id !in ids){ // check if this id is busy

                ids += id // if not busy then make it busy

                // analogue of leaving java synchronized block
                mutex.unlock() // allow other coroutines to check

                break // go out from loop to do task
            }

            // if this id is busy, we need to wait, and need to be notified

            // subscribe on shared flow events, then unlock mutex
            // subscribe BEFORE unlock, so other coroutines can't push notifyAll events before we can receive them
            signal.onSubscription {
                // analogue of leaving java synchronized block
                mutex.unlock()
            }
                .take(1)
                .collect() // analogue of java blocking wait()
        }

        // returns Job
        launch {
            task() // executes some task
            mutex.withLock { // analogue of java synchronized block
                ids -= id // release this id
                signal.emit(Unit) // analogue of java notifyAll()
            }
        }
    }
}

Upvotes: 0

funct7
funct7

Reputation: 3591

I suggest using a CompletableJob for that.

My use case:

suspend fun onLoad() {
    var job1: CompletableJob? = Job()
    var job2: CompletableJob? = Job()

    lifecycleScope.launch {
        someList.collect {
            doSomething(it)
            job1?.complete()
        }
    }

    lifecycleScope.launch {
        otherList.collect {
            doSomethingElse(it)
            job2?.complete()
        }
    }

    joinAll(job1!!, job2!!) // suspends until both jobs are done

    job1 = null
    job2 = null

    // Do something one time
}

Upvotes: 3

Alexey Romanov
Alexey Romanov

Reputation: 170735

Channels can be used for this (though they are more general):

When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.

So create

val channel = Channel<Unit>(0)

And use channel.receive() for object.wait(), and channel.offer(Unit) for object.notify() (or send if you want to wait until the other coroutine receives).

For notifyAll, you can use BroadcastChannel instead.

You can of course easily encapsulate it:

inline class Waiter(private val channel: Channel<Unit> = Channel<Unit>(0)) {

    suspend fun doWait() { channel.receive() }
    fun doNotify() { channel.offer(Unit) }
}

Upvotes: 35

Eugene Petrenko
Eugene Petrenko

Reputation: 4992

It is possible to use the basic suspendCoroutine{..} function for that, e.g.

class SuspendWait() {
  private lateinit var myCont: Continuation<Unit>
  suspend fun sleepAndWait() = suspendCoroutine<Unit>{ cont ->
    myCont = cont
  }

  fun resume() {
    val cont = myCont
    myCont = null
    cont.resume(Unit)
  }
}

It is clear, the code have issues, e.g. myCont field is not synchonized, it is expected that sleepAndWait is called before the resume and so on, hope the idea is clear now.

There is another solution with the Mutex class from the kotlinx.coroutines library.

class SuspendWait2 {
  private val mutex = Mutex(locaked = true)
  suspend fun sleepAndWait() = mutex.withLock{}
  fun resume() {
    mutex.unlock()
  }
}

Upvotes: 13

Related Questions