ThanosFisherman
ThanosFisherman

Reputation: 5859

How to cancel/unsubscribe from coroutines Flow

I notice a strange behavior when trying to prematurely cancel from a Flow. Take a look at the following example.

This is a simple flow that emits integer values

  private fun createFlow() = flow {
        repeat(10000) {
            emit(it)
        }
    }

Then I call the createFlow function using this code

  CoroutineScope(Dispatchers.Main).launch {
            createFlow().collect {

                Log.i("Main", "$it isActive $isActive")
                if (it == 2) {
                    cancel()
                }
            }
        }

This is what is printed out

0 isActive true
1 isActive true
2 isActive true
3 isActive false
4 isActive false
etc...etc

Now I would expect that the flow should stop emitting integers once it reaches the value of 2 but instead it actually switches the isActive flag to false and keeps emitting without otherwise stopping.

When I add a delay between emissions the flow behaves as I would expect.

private fun createFlow() = flow {
    repeat(10000) {
        delay(500) //add a delay
        emit(it)
    }
}

This is what is printed out after calling the flow again (which is the expected behaviour).

0 isActive true
1 isActive true
2 isActive true

What can I do to cancel the flow emission exactly at the specified value without adding delay?

Upvotes: 26

Views: 18860

Answers (3)

i30mb1
i30mb1

Reputation: 4776

I want to add that in 1.3.7 version emissions from flow builder now check cancellation status and are properly cancellable. So the code in question will work as expected

Upvotes: 14

Cruces
Cruces

Reputation: 3119

I came up with this recently

it seems that it will only actually cancel if it reaches a suspending point and in your code that emits there is no such point

to solve this either add yield() between emissions or some other suspending function like delay(100)

Upvotes: 1

ThanosFisherman
ThanosFisherman

Reputation: 5859

I came across a workaround in this related issue

I have replaced every single collect with a safeCollect function in my project:

/**
 * Only proceed with the given action if the coroutine has not been cancelled.
 * Necessary because Flow.collect receives items even after coroutine was cancelled
 * https://github.com/Kotlin/kotlinx.coroutines/issues/1265
 */
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (T) -> Unit) {
  collect {
    coroutineContext.ensureActive()
    action(it)
  }
}

Upvotes: 18

Related Questions