Reputation: 1125
Can I use async{} inside kotlin flow .
Scenario: After the API call I get a list of 200 objects that I need to parse (convert to UIObject). I am trying to process this list in parallel. Below is the Pseudo code:
fun getUIObjectListFlow():Flow<List<UIObject>> {
flow<List<UIObject>> {
while (stream.hasNext()) {
val data = stream.getData() // reading from an input stream. Data comes in chunk
val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
val processedList = firstHalfDeffered.await() + secondHalfDeffered.await() // just pseudo code
emit(processedList)
}
}
}
As async{} requires Coroutine scope (eg: someScope.async{} ), how can I get a scope inside flow ? Is there any other approach to accomplish this ?
This function is in the repository and I am calling it from viewmodel.
Thanks
Upvotes: 8
Views: 9510
Reputation: 37650
(Original answer to the initial question)
As @broot mentioned in the comments, you don't need a Flow<T>
if what you want is to produce a single item (even if that single item is a collection).
In general, you'll simply want a suspend
function (or a suspending piece of code in this case) instead of a function that returns a Flow
.
Now, whether you keep your single-item flow or not, you can use the coroutineScope { ... }
suspending function to define a local scope from which you can start coroutines. This function does a few things:
Here is how it could look like:
val uiObjects = coroutineScope { //this: CoroutineScope
val list = getDataFromServer()
val firstHalf = async(Dispatchers.IO) { /*process first half of the list */ }
val secondHalf = async(Dispatchers.IO) { /*process second half of the list */ }
// the last expression from the block is what the uiObjects variable gets
firstHalf.await() + secondHalf.await()
}
EDIT: given the question update, here is some updated code. You should still use coroutineScope
to create a local scope for your short-lived coroutines:
fun getUIObjectListFlow(): Flow<List<UIObject>> = flow<List<UIObject>> {
while (stream.hasNext()) {
val data = stream.getData() // reading from an input stream. Data comes in chunk
val processedList = coroutineScope {
val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
firstHalfDeffered.await() + secondHalfDeffered.await()
}
emit(processedList)
}
}
Upvotes: 12