Andy Victors
Andy Victors

Reputation: 405

Parallel coroutines execution with timeout and combine results at the end

I'm trying to implement kind of trivial use case with coroutines: sending parallel requests and then wait until all come back and merge the results in one list. I use the logic as below but somehow it does not wait for all responses but finishes (goes to flatten()) after first one is done. What am I doing wrong?

fun run() {
    GlobalScope.launch  {
        running = true
        results =
        providers
                .map { provider -> async { provider.retrieve() } }
                .map { retrieval ->
                    try {
                        withTimeout(2000L) {
                            retrieval.await()
                        }
                    } catch (ex: CancellationException) {
                        arrayListOf<Pair<String, String>>()
                    }
                }
                .flatten()
        running = false
        notifyObservers()
    }
}

Upvotes: 1

Views: 1460

Answers (1)

ChristianB
ChristianB

Reputation: 2680

I can not see anything wrong in your code. Anyway lets see what I can do, maybe it helps.

I assume your Provider retrieve() returns a list of some type T.

Let's create a similar Provider class with a suspending function returning a list of Int:

class Provider(val name: String) {
  suspend fun execute(): List<Int>
}

Lets then create a list of 3 Providers:

val providers: List<Provider> = listOf(Provider("p1"), Provider("p2"), Provider("p3"))

Using the map() function on list we wrap them in a Deferred using async() function:

val deferredList: List<Deferred<Int>> = providers.map { provider ->
  async { provider.execute() }
}

Execute Deferred

Now we have two options, either we do another map operation and call await() on each Deferred:

val result: List<List<Int>> = deferredList.map { it.await } }

or we use the extension function awaitAll() and get the actual result as a list of Integers:

val result: List<List<Int>> = deferredList.awaitAll()

Then we can flatten the result using flatten()

Put all together

Lets create one function that gets a list of providers and returns a list of Int, once all suspending calls are done.

suspend fun executeAllProvidersConcurrently(providers: List<Provider>): List<Int> = withContext(Dispatchers.Default){
    return@withContext providers.map {
        async { it.execute() }
    }.awaitAll().flatten()
}

launch {
  println(executeAllProvidersConcurrently(providers))
}

As you can see I didn't do that much differently. I have created a Gist where you can get the full sample code and run it yourself.

Hope this will help you running Coroutines concurrently and fetch the result of all.

Upvotes: 1

Related Questions