DeveloperKurt
DeveloperKurt

Reputation: 788

Kotlin Coroutines - Suspend function returning a Flow runs forever

I am making a network repository that supports multiple data retrieval configs, therefore I want to separate those configs' logic into functions.

However, I have a config that fetches the data continuously at specified intervals. Everything is fine when I emit those values to the original Flow. But when I take the logic into another function and return another Flow through it, it stops caring about its coroutine scope. Even after the scope's cancelation, it keeps on fetching the data.

TLDR: Suspend function returning a flow runs forever when currentCoroutineContext is used to control its loop's termination.

What am I doing wrong here? Here's the simplified version of my code:

Fragment calling the viewmodels function that basically calls the getData()

 lifecycleScope.launch {
            viewModel.getLatestDataList()
        }

Repository

suspend fun getData(config: MyConfig): Flow<List<Data>>
{
    return flow {

        when (config)
        {
            CONTINUOUS ->
            {
                //It worked fine when fetchContinuously was ingrained to here and emitted directly to the current flow
                //And now it keeps on running eternally
                fetchContinuously().collect { updatedList ->
                    emit(updatedList)
                }
            }
        }
    }
}


//Note logic of this function is greatly reduced to keep the focus on the problem
private suspend fun fetchContinuously(): Flow<List<Data>>
{
    return flow {
        while (currentCoroutineContext().isActive)
        {

            val updatedList = fetchDataListOverNetwork().await()

            if (updatedList != null)
            {
                emit(updatedList)
            }

            delay(refreshIntervalInMs)
        }

        Timber.i("Context is no longer active - terminating the continuous-fetch coroutine")
    }
}


private suspend fun fetchDataListOverNetwork(): Deferred<List<Data>?> =

    withContext(Dispatchers.IO) {

        return@withContext async {

            var list: List<Data>? = null

            try
            {
                val response = apiService.getDataList().execute()

                if (response.isSuccessful && response.body() != null)
                {
                    list = response.body()!!.list
                }
                else
                {
                    Timber.w("Failed to fetch data from the network database. Error body: ${response.errorBody()}, Response body: ${response.body()}")
                }
            }
            catch (e: Exception)
            {
                Timber.w("Exception while trying to fetch data from the network database. Stacktrace: ${e.printStackTrace()}")
            }
            finally
            {
                return@async list
            }
            list //IDE is not smart enough to realize we are already returning no matter what inside of the finally block; therefore, this needs to stay here
        }

    }

Upvotes: 6

Views: 20011

Answers (2)

ChristianB
ChristianB

Reputation: 2690

I am not sure whether this is a solution to your problem, but you do not need to have a suspending function that returns a Flow. The lambda you are passing is a suspending function itself:

fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> (source)

Here is an example of a flow that repeats a (GraphQl) query (simplified - without type parameters) I am using:

    override fun query(query: Query,
                       updateIntervalMillis: Long): Flow<Result<T>> {
    return flow {
        // this ensures at least one query
        val result: Result<T> = execute(query)
        emit(result)

        while (coroutineContext[Job]?.isActive == true && updateIntervalMillis > 0) {
            delay(updateIntervalMillis)

            val otherResult: Result<T> = execute(query)
            emit(otherResult)
        }
    }
}

Upvotes: 8

Andrew Chelix
Andrew Chelix

Reputation: 1212

I'm not that good at Flow but I think the problem is that you are delaying only the getData() flow instead of delaying both of them. Try adding this:

suspend fun getData(config: MyConfig): Flow<List<Data>>
{
    return flow {

        when (config)
        {
            CONTINUOUS ->
            {
                fetchContinuously().collect { updatedList ->
                    emit(updatedList)
                    delay(refreshIntervalInMs)
                }
            }
        }
    }
}

Take note of the delay(refreshIntervalInMs).

Upvotes: 0

Related Questions