User1291
User1291

Reputation: 8179

Kotlin - coroutine scopes, why doesn't my async get executed?

How do CoroutineScopes work?

Let's say I have an

enum class ConceptualPosition{
    INVALID,
    A,B
}

And let's assume I have an UI from which a user can click on either position, A or B.

I now want an Actor that receives user input but ignores it until an input is actually requested. For sake of simplicity, let's say there is just one way to request a position.

sealed class PositionRequest{
    /**report the next position offered*/
    object ForwardNext:PositionRequest()
}

So we may construct something like this:

fun CoroutineScope.positionActor(
        offeredPosition:ReceiveChannel<ConceptualPosition>,
        requests:ReceiveChannel<PositionRequest>,
        output:SendChannel<ConceptualPosition>
) = launch{
    var lastReceivedPosition = INVALID
    var forwardNextReceived = 0

    println("ACTOR: entering while loop")
    while(true) {
        select<Unit> {
            requests.onReceive {
                println("ACTOR: requests.onReceive($it)")
                when (it) {
                    is PositionRequest.ForwardNext -> ++forwardNextReceived
                }
            }

            offeredPosition.onReceive {
                println("ACTOR: offeredPosition.onReceive($it)")
                lastReceivedPosition = it
                if (forwardNextReceived > 0) {
                    --forwardNextReceived
                    output.send(it)
                }
            }
        }
    }
}

And then build a facade to interact with it:

class BasicUI{
    private val dispatcher = Dispatchers.IO

    /*start a Position Actor that receives input from the UI and forwards them on demand*/
    private val requests = Channel<PositionRequest>()
    private val offeredPositions = Channel<ConceptualPosition>()
    private val nextPosition = Channel<ConceptualPosition>()
    init {
        runBlocking(dispatcher){
            positionActor(offeredPositions,requests,nextPosition)
        }
    }

    /** Receives a [ConceptualPosition] that may or may not get accepted and acted upon.*/
    fun offerPosition(conceptualPosition: ConceptualPosition) = runBlocking(dispatcher) {
        offeredPositions.send(conceptualPosition)
    }

    /** waits for a [ConceptualPosition] to be offered via [offerPosition], then accepts it*/
    fun getPosition(): ConceptualPosition = runBlocking(dispatcher){
        requests.send(PositionRequest.ForwardNext)
        nextPosition.receive()
    }
}

Which of course doesn't work,because since runBlocking is a CoroutineScope, init will not return until the coroutine launched by positionActor(offeredPositions,requests,nextPosition) ends ... which is never because there's a while(true) in it.

So what if we let the BasicUI implement CoroutineScope? After all, that is what Roman Elizarov said we should do at the KotlinConf, and if I understood him correctly, should bind the coroutine created by positionActor(...) to the BasicUI instance, rather than the runBlocking-block.

Let's see ...

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlin.coroutines.CoroutineContext

class BasicUI:CoroutineScope{

    private val dispatcher = Dispatchers.IO

    private val job = Job()
    override val coroutineContext: CoroutineContext
        get() = job

    /*start a Position Actor that receives input from the UI and forwards them on demand*/
    private val requests = Channel<PositionRequest>()
    private val offeredPositions = Channel<ConceptualPosition>()
    private val nextPosition = Channel<ConceptualPosition>()
    init {
        positionActor(offeredPositions,requests,nextPosition)
    }

    /** Receives a [ConceptualPosition] that may or may not get accepted and acted upon.*/
    fun offerPosition(conceptualPosition: ConceptualPosition) = runBlocking(dispatcher) {
        offeredPositions.send(conceptualPosition)
    }

    /** waits for a [ConceptualPosition] to be offered via [offerPosition], then accepts it*/
    fun getPosition(): ConceptualPosition = runBlocking(dispatcher){
        requests.send(PositionRequest.ForwardNext)
        nextPosition.receive()
    }
}

Let's build a small testcase: I'll offer the actor a few As that he should ignore, then start a coroutine that continuously offers Bs, one of which will be returned to me when I ask the actor for a position.

import ConceptualPosition.*
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking

fun main(args: Array<String>) = runBlocking{
    val ui = BasicUI()
    println("actor engaged")

    //these should all be ignored
    repeat(5){ui.offerPosition(A)}
    println("offered some 'A's")

    //keep offering 'B' so that eventually, one will be offered after we request a position
    async { while(true){ui.offerPosition(B)} }

    //now get a 'B'
    println("requesting a position")
    val pos = ui.getPosition()
    println("received '$pos'")
}

This results in

actor engaged
ACTOR: entering while loop
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
offered some 'A's
ACTOR: offeredPosition.onReceive(A)
requesting a position
ACTOR: requests.onReceive(PositionRequest$ForwardNext@558da0e9)

... and nothing.

Apparently, B is never offered -- and therefore, never forwarded -- which results in the main thread's blocking (as it should, in that situation).

I threw a

if(conceptualPosition == ConceptualPosition.B) throw RuntimeException("B offered?!")

into BasicUI.offerPosition and there was no exception, so ...

At this point, I probably have to admit I don't understand Kotlin CoroutineScope yet.

Why doesn't this example work?

Upvotes: 1

Views: 1770

Answers (1)

Kiskae
Kiskae

Reputation: 25573

There seem to be two issues here:

  1. offerPosition/getPosition are not suspend functions. Using runBlocking is the wrong solution in most cases and should be used when having to interface with synchronous code or the main function.
  2. async without any parameters is executed in the current CoroutineScope. For your main function this is runBlocking. The documentation actually describes the behaviour:

The default CoroutineDispatcher for this builder in an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine. See CoroutineDispatcher for the other implementations that are provided by kotlinx.coroutines.

In plain terms, the async block will not get a turn to execute in the event loop while other continuations are using it. Since getPosition is blocking you block the event loop.

Replacing the blocking functions with suspend functions and withContext(dispatcher) to dispatch on a different executor would allow the async function to run and the state to resolve eventually.

Upvotes: 2

Related Questions