Android14
Android14

Reputation: 1125

async{} inside flow

Can I use async{} inside kotlin flow .

Scenario: After the API call I get a list of 200 objects that I need to parse (convert to UIObject). I am trying to process this list in parallel. Below is the Pseudo code:

 fun getUIObjectListFlow():Flow<List<UIObject>> {
    flow<List<UIObject>> {
        while (stream.hasNext()) {
            val data = stream.getData() // reading from an input stream. Data comes in chunk

            val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
            val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
            val processedList = firstHalfDeffered.await() + secondHalfDeffered.await() // just pseudo code

            emit(processedList)
        }
    }
}

As async{} requires Coroutine scope (eg: someScope.async{} ), how can I get a scope inside flow ? Is there any other approach to accomplish this ?

This function is in the repository and I am calling it from viewmodel.

Thanks

Upvotes: 8

Views: 9510

Answers (1)

Joffrey
Joffrey

Reputation: 37650

(Original answer to the initial question)

As @broot mentioned in the comments, you don't need a Flow<T> if what you want is to produce a single item (even if that single item is a collection). In general, you'll simply want a suspend function (or a suspending piece of code in this case) instead of a function that returns a Flow.

Now, whether you keep your single-item flow or not, you can use the coroutineScope { ... } suspending function to define a local scope from which you can start coroutines. This function does a few things:

  1. it provides a scope to start child coroutines
  2. it suspends until all child coroutines are done
  3. it returns a value based on the last expression in the block (the "return" value of the lambda)

Here is how it could look like:

val uiObjects = coroutineScope { //this: CoroutineScope
    val list = getDataFromServer()
            
    val firstHalf = async(Dispatchers.IO) { /*process first half of the list */ }
    val secondHalf = async(Dispatchers.IO) { /*process second half of the list */ }
            
    // the last expression from the block is what the uiObjects variable gets
    firstHalf.await() + secondHalf.await()
}

EDIT: given the question update, here is some updated code. You should still use coroutineScope to create a local scope for your short-lived coroutines:

fun getUIObjectListFlow(): Flow<List<UIObject>> = flow<List<UIObject>> {
    while (stream.hasNext()) {
        val data = stream.getData() // reading from an input stream. Data comes in chunk

        val processedList = coroutineScope {
            val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
            val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
            firstHalfDeffered.await() + secondHalfDeffered.await() 
        }
        emit(processedList)
    }
}

Upvotes: 12

Related Questions