HelloCW
HelloCW

Reputation: 2335

Why can I cancel a Flow without either invoking yield or determining isActive() identification in Kotlin?

I have read the article.

There are two approaches to making computation code cancellable. The first one is to periodically invoke a suspending function that checks for cancellation. There is a yield function that is a good choice for that purpose. The other one is to explicitly check the cancellation status.

I know Flow is suspending functions.

I run Code B , and get Result B as I expected.

I think I can't making computation Code A cancellable, but in fact I can click "Stop" button to cancel Flow after I click "Start" button to emit Flow, why?

Code A

class HandleMeter: ViewModel() { 
    var currentInfo by mutableStateOf(2.0)

    private var myJob: Job?=null

    private fun soundDbFlow() = flow {
          while (true) {
             val data = (0..1000).random().toDouble()
             emit(data)
          }
       }

    fun calCurrentAsynNew() {
        myJob?.cancel()
        myJob = viewModelScope.launch(Dispatchers.IO) {
            soundDbFlow().collect {currentInfo=it }
        }
    }

    fun cancelJob(){
        myJob?.cancel()
    }
}

@Composable
fun Greeting(handleMeter: HandleMeter) {
    var currentInfo = handleMeter.currentInfo

    Column(
        modifier = Modifier.fillMaxSize(),
    ) {

        Text(text = "Current ${currentInfo}")
        Button(
            onClick = { handleMeter.calCurrentAsynNew() }
        ) {
            Text("Start")
        }
        Button(
            onClick = { handleMeter.cancelJob() }
        ) {
            Text("Stop")
        }
    }
}

Code B

import kotlinx.coroutines.*

fun main() = runBlocking {
    
    val job = launch(Dispatchers.IO) {
      cal()  
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() 
    println("main: Now I can quit.")
}

suspend fun cal()  {
   val startTime = System.currentTimeMillis()
   var nextPrintTime = startTime
   var i = 0
   while (i < 5) {     
        if ( System.currentTimeMillis() >= nextPrintTime) {
             println("job: I'm sleeping ${i++} ...")
             nextPrintTime += 500L
         }
   }
}

Result B

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.

Add Content:

To Tenfour04: Thanks!

If the following content you said is true. I think Code C can be canceled when system finish the operation doBigBlockingCalculation() at one time, right? Why do I need Code D?

Since emit() is a suspend function, your Flow is able to interrupt and end the coroutine the next time the emit() function is called in that while loop.

Code C

private fun complicatedFlow() = flow {
      while (true) {
         val data = (0..1_000_000).doBigBlockingCalculation()
         emit(data)
      }
 }.flowOn(Dispatchers.Default) // since the calculation is blocking

Code D

private fun complicatedFlow() = flow {
      while (true) {
         val data = (0..1_000_000)
             .chunked(100_000)
             .flatMap {
                 it.doBigBlockingCalculation().also { yield() }
             }
         emit(data)
      }
   }.flowOn(Dispatchers.Default) // since the calculation is blocking

Upvotes: 3

Views: 921

Answers (2)

Tenfour04
Tenfour04

Reputation: 93832

A Flow on its own is cold. Its a wrapper around some suspend functions that will run when collect() or some other terminal suspending function is called on the Flow.

In your Code A, when the Job is cancelled, it is cancelling the coroutine that called collect on the Flow. collect is a suspend function, so that cancellation will propagate down to the function you defined inside soundDbFlow(). Since emit() is a suspend function, your Flow is able to interrupt and end the coroutine the next time the emit() function is called in that while loop.

Here's an example for how you could use this knowledge:

Suppose your function had to do a very long calculation like this:

private fun complicatedFlow() = flow {
      while (true) {
         val data = (0..1_000_000).doBigBlockingCalculation()
         emit(data)
      }
   }.flowOn(Dispatchers.Default) // since the calculation is blocking

Now if you tried to cancel this flow, it would work, but since the data line is a very slow operation that is not suspending, the Flow will still complete this very long calculation for no reason, eating up resources for longer than necessary.

To resolve this problem, you could break your calculation up into smaller pieces with yield() calls in between. Then the Flow can be cancelled more promptly.

private fun complicatedFlow() = flow {
      while (true) {
         val data = (0..1_000_000)
             .chunked(100_000)
             .flatMap {
                 it.doBigBlockingCalculation().also { yield() }
             }
         emit(data)
      }
   }.flowOn(Dispatchers.Default) // since the calculation is blocking

Not a perfect example. It's kind of wasteful to chunk a big IntRange. An IntRange takes barely any memory, but chunked turns it into Lists containing every value in the range.

Upvotes: 4

Filip Petrovski
Filip Petrovski

Reputation: 444

It has to do with CoroutineScopes and children of coroutines. When a parent coroutine is canceled, all its children are canceled as well.

More here: https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html#children-of-a-coroutine

Upvotes: 0

Related Questions