Reputation: 8179
How do CoroutineScopes work?
Let's say I have an
enum class ConceptualPosition{
INVALID,
A,B
}
And let's assume I have an UI from which a user can click on either position, A
or B
.
I now want an Actor that receives user input but ignores it until an input is actually requested. For sake of simplicity, let's say there is just one way to request a position.
sealed class PositionRequest{
/**report the next position offered*/
object ForwardNext:PositionRequest()
}
So we may construct something like this:
fun CoroutineScope.positionActor(
offeredPosition:ReceiveChannel<ConceptualPosition>,
requests:ReceiveChannel<PositionRequest>,
output:SendChannel<ConceptualPosition>
) = launch{
var lastReceivedPosition = INVALID
var forwardNextReceived = 0
println("ACTOR: entering while loop")
while(true) {
select<Unit> {
requests.onReceive {
println("ACTOR: requests.onReceive($it)")
when (it) {
is PositionRequest.ForwardNext -> ++forwardNextReceived
}
}
offeredPosition.onReceive {
println("ACTOR: offeredPosition.onReceive($it)")
lastReceivedPosition = it
if (forwardNextReceived > 0) {
--forwardNextReceived
output.send(it)
}
}
}
}
}
And then build a facade to interact with it:
class BasicUI{
private val dispatcher = Dispatchers.IO
/*start a Position Actor that receives input from the UI and forwards them on demand*/
private val requests = Channel<PositionRequest>()
private val offeredPositions = Channel<ConceptualPosition>()
private val nextPosition = Channel<ConceptualPosition>()
init {
runBlocking(dispatcher){
positionActor(offeredPositions,requests,nextPosition)
}
}
/** Receives a [ConceptualPosition] that may or may not get accepted and acted upon.*/
fun offerPosition(conceptualPosition: ConceptualPosition) = runBlocking(dispatcher) {
offeredPositions.send(conceptualPosition)
}
/** waits for a [ConceptualPosition] to be offered via [offerPosition], then accepts it*/
fun getPosition(): ConceptualPosition = runBlocking(dispatcher){
requests.send(PositionRequest.ForwardNext)
nextPosition.receive()
}
}
Which of course doesn't work,because since runBlocking
is a CoroutineScope
, init
will not return until the coroutine launched by positionActor(offeredPositions,requests,nextPosition)
ends ... which is never because there's a while(true)
in it.
So what if we let the BasicUI
implement CoroutineScope
? After all, that is what Roman Elizarov said we should do at the KotlinConf, and if I understood him correctly, should bind the coroutine created by positionActor(...)
to the BasicUI
instance, rather than the runBlocking
-block.
Let's see ...
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlin.coroutines.CoroutineContext
class BasicUI:CoroutineScope{
private val dispatcher = Dispatchers.IO
private val job = Job()
override val coroutineContext: CoroutineContext
get() = job
/*start a Position Actor that receives input from the UI and forwards them on demand*/
private val requests = Channel<PositionRequest>()
private val offeredPositions = Channel<ConceptualPosition>()
private val nextPosition = Channel<ConceptualPosition>()
init {
positionActor(offeredPositions,requests,nextPosition)
}
/** Receives a [ConceptualPosition] that may or may not get accepted and acted upon.*/
fun offerPosition(conceptualPosition: ConceptualPosition) = runBlocking(dispatcher) {
offeredPositions.send(conceptualPosition)
}
/** waits for a [ConceptualPosition] to be offered via [offerPosition], then accepts it*/
fun getPosition(): ConceptualPosition = runBlocking(dispatcher){
requests.send(PositionRequest.ForwardNext)
nextPosition.receive()
}
}
Let's build a small testcase: I'll offer the actor a few A
s that he should ignore, then start a coroutine that continuously offers B
s, one of which will be returned to me when I ask the actor for a position.
import ConceptualPosition.*
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
fun main(args: Array<String>) = runBlocking{
val ui = BasicUI()
println("actor engaged")
//these should all be ignored
repeat(5){ui.offerPosition(A)}
println("offered some 'A's")
//keep offering 'B' so that eventually, one will be offered after we request a position
async { while(true){ui.offerPosition(B)} }
//now get a 'B'
println("requesting a position")
val pos = ui.getPosition()
println("received '$pos'")
}
This results in
actor engaged
ACTOR: entering while loop
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
offered some 'A's
ACTOR: offeredPosition.onReceive(A)
requesting a position
ACTOR: requests.onReceive(PositionRequest$ForwardNext@558da0e9)
... and nothing.
Apparently, B
is never offered -- and therefore, never forwarded -- which results in the main thread's blocking (as it should, in that situation).
I threw a
if(conceptualPosition == ConceptualPosition.B) throw RuntimeException("B offered?!")
into BasicUI.offerPosition
and there was no exception, so ...
At this point, I probably have to admit I don't understand Kotlin CoroutineScope
yet.
Why doesn't this example work?
Upvotes: 1
Views: 1770
Reputation: 25573
There seem to be two issues here:
offerPosition
/getPosition
are not suspend functions. Using runBlocking
is the wrong solution in most cases and should be used when having to interface with synchronous code or the main function.async
without any parameters is executed in the current CoroutineScope
. For your main function this is runBlocking
. The documentation actually describes the behaviour:The default CoroutineDispatcher for this builder in an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine. See CoroutineDispatcher for the other implementations that are provided by kotlinx.coroutines.
In plain terms, the async
block will not get a turn to execute in the event loop while other continuations are using it. Since getPosition
is blocking you block the event loop.
Replacing the blocking functions with suspend functions and withContext(dispatcher)
to dispatch on a different executor would allow the async function to run and the state to resolve eventually.
Upvotes: 2