Marcin Small-ski
Marcin Small-ski

Reputation: 171

Kotlin flow - how to handle cancelation

I'm learning kotlin coroutines and flows and one thing is a little bit obscure to me. In case I have a long running loop for the regular coroutines I can use isActive or ensureActive to handle cancelation. However those are not defined for a flow but nevertheless the following code properly finishes the flow:

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory

private val logger = LoggerFactory.getLogger("Main")

fun main() {
    val producer = FlowProducer()
    runBlocking {
        producer
            .produce()
            .take(10)
            .collect {
                logger.info("Received $it")
            }
    }
    logger.info("done")
}


class FlowProducer {
    fun produce() = flow {
        try {
            var counter = 1
            while (true) {
                logger.info("Before emit")
                emit(counter++)
                logger.info("After emit")
            }
        }finally {
            logger.info("Producer has finished")
        }

    }.flowOn(Dispatchers.IO)
}

Why is that a case? Is it because the emit is a suspendable function that handles cancelation for me? What to do in case the emit is called conditionally? For example that loop actually polls records from Kafka and calls emit only when the received records are not empty. Then we can have the situation that:

  1. We want 10 messages(take 10)
  2. Actually there are only 10 messages on the kafka topic
  3. Since there are no more messages the emit won't be called again and therefore even though we received all messages we want, the loop will continue to waste resources on unnecessary polling.

Not sure if my understanding is correct. Should I call yield() on each loop in such case?

Upvotes: 5

Views: 7506

Answers (3)

Saba
Saba

Reputation: 1541

// some flow
.onCompletion {
    val isCancelled = (it is CancellationException)
}

Upvotes: 1

Sam
Sam

Reputation: 9944

The important thing to remember here is that flows are "cold", at least in their simple form. What that means is that a flow isn't capable of doing any work except while you are actively consuming data from it. A cold flow doesn't have a coroutine associated with it. You can learn a little more from this blog post by Roman Elizarov.

When you call collect on a flow, control is tranferred from the collector to the flow. This is what enables the flow to do work. The collector is effectively executing the code inside the flow. When the flow calls emit, control transfers back to the collector. If you're familiar with Kotlin's sequence builder, you can think of flows very similarly.

By definition, this means that if you stop collecting the flow, the flow stops doing any work. In your case, because you used take(10), the collector will stop executing the flow once it has received ten items. Because the collector is the thing that's actually executing the loop inside the flow, the loop doesn't continue to run when the collector is no longer collecting. Once you stop using the flow, it's just like an iterator that's no longer being iterated over. It can be garbage collected like any other object.

You asked whether you should call yield() inside your flow. There are some situations where this could be useful, and you can read more about flow cancellation checks in the docs. In your case, it's not necessary, because:

  1. The cancellation checks are only needed to detect when something has cancelled the coroutine that is executing the flow. When the flow aborts itself, such as when take(10) has emitted 10 items, it simply terminates normally, without cancelling any coroutines.
  2. The flow is built using emit, which already checks for cancellation.

Even when cancellation checks aren't required, it's still possible to create a flow that runs forever. As mentioned above, control only transfers back to the collector each time the flow calls emit. So if your flow runs indefinitely without calling emit, it will never return control back to the collector. This is the same as writing an infinite loop in normal code, and isn't particularly special to flows.

Note that it is possible to create a hot flow that has a coroutine doing work in the background. In that case, you would need to make sure that the coroutine responds correctly to cancellation of the flow.

Upvotes: 6

Matt Timmermans
Matt Timmermans

Reputation: 59174

Yes, emit will throw CancellationException when take cancels the flow.

The Kafka example you give will actually work, because take will cancel the flow at the end of the 10th emit, not at the start of the 11th.

Upvotes: 0

Related Questions