noloman
noloman

Reputation: 11975

Android: collecting a Kotlin Flow inside another not emitting

I have got the following method:

    operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> = flow {
        val jobDomainModelList = mutableListOf<JobDomainModel>()
        jobListingRepository.searchJobs(sanitizeSearchQuery(query))
            .collect { jobEntityList: List<JobEntity> ->
                for (jobEntity in jobEntityList) {
                    categoriesRepository.getCategoryById(jobEntity.categoryId)
                        .collect { categoryEntity ->
                            if (categoryEntity.categoryId == jobEntity.categoryId) {
                                jobDomainModelList.add(jobEntity.toDomainModel(categoryEntity))
                            }
                        }
                }
                emit(jobDomainModelList)
            }
    }

It searches in a repository calling the search method that returns a Flow<List<JobEntity>>. Then for every JobEntity in the flow, I need to fetch from the DB the category to which that job belongs. Once I have that category and the job, I can convert the job to a domain model object (JobDomainModel) and add it to a list, which will be returned in a flow as the return object of the method.

The problem I'm having is that nothing is ever emitted. I'm not sure if I'm missing something from working with flows in Kotlin, but I don't fetch the category by ID (categoriesRepository.getCategoryById(jobEntity.categoryId)) it then works fine and the list is emitted.

Thanks a lot in advance!

Upvotes: 7

Views: 6488

Answers (2)

Qamar
Qamar

Reputation: 5137

Get an idea from the code below if your Kotlin coroutine flow gets lost with a continuation approximate peak alloc exception

fun test(obj1: Object,obj2: Object) = flow {
    emit(if (obj1 != null) repository.postObj(obj1).first() else IgnoreObjResponse)
}.map { Pair(it, repository.postObj(obj2).first()) }

Upvotes: 0

Tenfour04
Tenfour04

Reputation: 93834

I think the problem is that you're collecting infinite length Flows, so collect never returns. You should use .take(1) to get a finite Flow before collecting it, or use first().

The Flows returned by your DAO are infinite length. The first value is the first query made, but the Flow will continue forever until cancelled. Each item in the Flow is a new query made when the contents of the database change.

Something like this:

operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> =
    jobListingRepository.searchJobs(sanitizeSearchQuery(query))
        .map { jobEntityList: List<JobEntity> ->
            jobEntityList.mapNotNull { jobEntity ->
                categoriesRepository.getCategoryById(jobEntity.categoryId)
                    .first()
                    .takeIf { it.categoryId == jobEntity.categoryId }
            }
        }

Alternatively, in your DAO you could make a suspend function version of getCategoryById() that simply returns the list.

Upvotes: 6

Related Questions