Reputation: 5859
I notice a strange behavior when trying to prematurely cancel from a Flow. Take a look at the following example.
This is a simple flow that emits integer values
private fun createFlow() = flow {
repeat(10000) {
emit(it)
}
}
Then I call the createFlow
function using this code
CoroutineScope(Dispatchers.Main).launch {
createFlow().collect {
Log.i("Main", "$it isActive $isActive")
if (it == 2) {
cancel()
}
}
}
This is what is printed out
0 isActive true
1 isActive true
2 isActive true
3 isActive false
4 isActive false
etc...etc
Now I would expect that the flow should stop emitting integers once it reaches the value of 2 but instead it actually switches the isActive flag to false and keeps emitting without otherwise stopping.
When I add a delay between emissions the flow behaves as I would expect.
private fun createFlow() = flow {
repeat(10000) {
delay(500) //add a delay
emit(it)
}
}
This is what is printed out after calling the flow again (which is the expected behaviour).
0 isActive true
1 isActive true
2 isActive true
What can I do to cancel the flow emission exactly at the specified value without adding delay?
Upvotes: 26
Views: 18860
Reputation: 4776
I want to add that in 1.3.7 version emissions from flow builder now check cancellation status and are properly cancellable. So the code in question will work as expected
Upvotes: 14
Reputation: 3119
I came up with this recently
it seems that it will only actually cancel if it reaches a suspending point and in your code that emits there is no such point
to solve this either add yield() between emissions or some other suspending function like delay(100)
Upvotes: 1
Reputation: 5859
I came across a workaround in this related issue
I have replaced every single collect
with a safeCollect
function in my project:
/**
* Only proceed with the given action if the coroutine has not been cancelled.
* Necessary because Flow.collect receives items even after coroutine was cancelled
* https://github.com/Kotlin/kotlinx.coroutines/issues/1265
*/
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (T) -> Unit) {
collect {
coroutineContext.ensureActive()
action(it)
}
}
Upvotes: 18