user15301088
user15301088

Reputation:

Flow exception transparency is violated : Emission from another coroutine is detected

I'm tring to implement NetworkBoundResource class in my project and this is what i'm trying. Everything work correctly getting response, caching but when im emiting value inside flowBuilder then it crashes and showing this error.

error i'm getting:

    Emission from another coroutine is detected.
    Child of ProducerCoroutine{Active}@df26eb9, expected child of FlowCoroutine{Active}@a0bb2fe.
    FlowCollector is not thread-safe and concurrent emissions are prohibited.
    To mitigate this restriction please use 'channelFlow' builder instead of 'flow')' has been detected.
                  Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                  For a more detailed explanation, please refer to Flow documentation.

NetworkBoundResource class:


abstract class NetworkBoundResource<ResultType, RequestType> {

    fun invoke(): Flow<Resource<ResultType>> = flow {
        val rawData = loadFromDb()

        if (shouldFetch(rawData)) {
            fetchDataFromServer()
                .onStart { emit(Resource.loading(rawData)) } // emit() causing issue
                .catch { emit(Resource.error(it, null)) } // emit() causing issue
                .collectLatest { }
        }
    }

    // Save API response result into the database
    protected abstract suspend fun cacheInDb(items: RequestType)

    // Need to fetch data from server or not.
    protected abstract fun shouldFetch(data: ResultType?): Boolean

    // Show cached data from the database.
    protected abstract suspend fun loadFromDb(): ResultType

    // Fetch the data from server.
    protected abstract suspend fun fetchDataFromServer(): Flow<ApiResponse<List<Category>>>

    // when the fetch fails.
    protected open fun onFetchFailed() {}
}

Repository class:

    fun getCategories(): Flow<Resource<List<Category>>> {
        return object : NetworkBoundResource<List<Category>, List<Category>>() {

            override suspend fun cacheInDb(items: List<Category>) {
                withContext(Dispatchers.IO) { database.getCategories().insert(items) }
            }

            override fun shouldFetch(data: List<Category>?): Boolean {
                return true
            }

            override suspend fun loadFromDb(): List<Category> {
                return withContext(Dispatchers.IO) { database.getCategories().read() }
            }

            override suspend fun fetchDataFromServer(): Flow<ApiResponse<List<Category>>> {
                return flow { emit(RetrofitModule.getCategories()) }
            }

        }.invoke()
    }

myViewModelClass:

    init {
        viewModelScope.launch {
            repository.getCategories().collectLatest {
                if(it.data!=null){
                    _categories.value = it.data
                    Log.d("appDebug", " ViewModel : $it")
                }
            }
        }
    }

Upvotes: 32

Views: 17803

Answers (2)

Canturk Karabulut
Canturk Karabulut

Reputation: 136

You can use channelFlow instead of flow. The reason for the error is that while you are emitting values in the try, the catch block can also run. In this case, there is a conflict in the emit values. Or you can provide a solution by using flow's own catch blog instead of channelFlow.

 fun getAllItems(): Flow<Response<List<MarketEntity>>> {
        return channelFlow {
            try {
                trySend(Response.Loading())
                val dbData = dao.getAllItems()
                dbData.collectLatest {
                    trySend(Response.Success(data = it))
                }
            } catch (e: Exception) {
                trySend(Response.Error(message = e.message.toString() ?: ""))
            }
            awaitClose()
        }
    }

Upvotes: 0

R&#243;bert Nagy
R&#243;bert Nagy

Reputation: 7632

As the exception says, cold flows doesn't allow to emit() concurrently. You have two options:

  • Replace flow { } with channelFlow { } and send values with send() (Probably easier in your case)
  • Make sure no emit() is called concurrently

Upvotes: 57

Related Questions