Reputation: 11975
I have got the following method:
operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> = flow {
val jobDomainModelList = mutableListOf<JobDomainModel>()
jobListingRepository.searchJobs(sanitizeSearchQuery(query))
.collect { jobEntityList: List<JobEntity> ->
for (jobEntity in jobEntityList) {
categoriesRepository.getCategoryById(jobEntity.categoryId)
.collect { categoryEntity ->
if (categoryEntity.categoryId == jobEntity.categoryId) {
jobDomainModelList.add(jobEntity.toDomainModel(categoryEntity))
}
}
}
emit(jobDomainModelList)
}
}
It searches in a repository calling the search
method that returns a Flow<List<JobEntity>>
. Then for every JobEntity
in the flow, I need to fetch from the DB the category to which that job belongs. Once I have that category and the job, I can convert the job to a domain model object (JobDomainModel
) and add it to a list, which will be returned in a flow as the return object of the method.
The problem I'm having is that nothing is ever emitted. I'm not sure if I'm missing something from working with flows in Kotlin, but I don't fetch the category by ID (categoriesRepository.getCategoryById(jobEntity.categoryId)
) it then works fine and the list is emitted.
Thanks a lot in advance!
Upvotes: 7
Views: 6488
Reputation: 5137
Get an idea from the code below if your Kotlin coroutine flow gets lost with a continuation approximate peak alloc exception
fun test(obj1: Object,obj2: Object) = flow {
emit(if (obj1 != null) repository.postObj(obj1).first() else IgnoreObjResponse)
}.map { Pair(it, repository.postObj(obj2).first()) }
Upvotes: 0
Reputation: 93834
I think the problem is that you're collecting infinite length Flows, so collect
never returns. You should use .take(1)
to get a finite Flow before collecting it, or use first()
.
The Flows returned by your DAO are infinite length. The first value is the first query made, but the Flow will continue forever until cancelled. Each item in the Flow is a new query made when the contents of the database change.
Something like this:
operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> =
jobListingRepository.searchJobs(sanitizeSearchQuery(query))
.map { jobEntityList: List<JobEntity> ->
jobEntityList.mapNotNull { jobEntity ->
categoriesRepository.getCategoryById(jobEntity.categoryId)
.first()
.takeIf { it.categoryId == jobEntity.categoryId }
}
}
Alternatively, in your DAO you could make a suspend
function version of getCategoryById()
that simply returns the list.
Upvotes: 6