Reputation: 405
I'm trying to implement kind of trivial use case with coroutines: sending parallel requests and then wait until all come back and merge the results in one list. I use the logic as below but somehow it does not wait for all responses but finishes (goes to flatten()) after first one is done. What am I doing wrong?
fun run() {
GlobalScope.launch {
running = true
results =
providers
.map { provider -> async { provider.retrieve() } }
.map { retrieval ->
try {
withTimeout(2000L) {
retrieval.await()
}
} catch (ex: CancellationException) {
arrayListOf<Pair<String, String>>()
}
}
.flatten()
running = false
notifyObservers()
}
}
Upvotes: 1
Views: 1460
Reputation: 2680
I can not see anything wrong in your code. Anyway lets see what I can do, maybe it helps.
I assume your Provider retrieve()
returns a list of some type T
.
Let's create a similar Provider class with a suspending function returning a list of Int
:
class Provider(val name: String) {
suspend fun execute(): List<Int>
}
Lets then create a list of 3 Providers:
val providers: List<Provider> = listOf(Provider("p1"), Provider("p2"), Provider("p3"))
Using the map()
function on list we wrap them in a Deferred
using async()
function:
val deferredList: List<Deferred<Int>> = providers.map { provider ->
async { provider.execute() }
}
Execute Deferred
Now we have two options, either we do another map
operation and call await()
on each Deferred
:
val result: List<List<Int>> = deferredList.map { it.await } }
or we use the extension function awaitAll() and get the actual result as a list of Integers:
val result: List<List<Int>> = deferredList.awaitAll()
Then we can flatten the result using flatten()
Put all together
Lets create one function that gets a list of providers and returns a list of Int
, once all suspending calls are done.
suspend fun executeAllProvidersConcurrently(providers: List<Provider>): List<Int> = withContext(Dispatchers.Default){
return@withContext providers.map {
async { it.execute() }
}.awaitAll().flatten()
}
launch {
println(executeAllProvidersConcurrently(providers))
}
As you can see I didn't do that much differently. I have created a Gist where you can get the full sample code and run it yourself.
Hope this will help you running Coroutines concurrently and fetch the result of all.
Upvotes: 1