SteveNikks
SteveNikks

Reputation: 303

Kotlin Coroutines - How to block to await/join all jobs?

I am new to Kotlin/Coroutines, so hopefully I am just missing something/don't fully understand how to structure my code for the problem I am trying to solve.

Essentially, I am taking a list of strings, and for each item in the list I want to send it to another method to do work (make a network call and return data based on the response). (Edit:) I want all calls to launch concurrently, and block until all calls are done/the response is acted on, and then return a new list with the info of each response.

I probably don't yet fully understand when to use launch/async, but I've tried to following with both launch (with joinAll), and async (with await).

fun processData(lstInputs: List<String>): List<response> {

    val lstOfReturnData = mutableListOf<response>()

    runBlocking {
        withContext(Dispatchers.IO) {
            val jobs = List(lstInputs.size) {
                launch {
                    lstOfReturnData.add(networkCallToGetData(lstInputs[it]))
                }
            }
            jobs.joinAll()
        }
    }

    return lstofReturnData

What I am expecting to happen, is if my lstInputs is a size of 120, when all jobs are joined, my lstOfReturnData should also have a size of 120.

What actually is happening is inconsitent results. I'll run it once, and I get 118 in my final list, run it again, it's 120, run it again, it's 117, etc. In the networkCallToGetData() method, I am handling any exceptions, to at least return something for every request, regardless if the network call fails.

Can anybody help explain why I am getting inconsistent results, and what I need to do to ensure I am blocking appropriately and all jobs are being joined before moving on?

Upvotes: 14

Views: 25039

Answers (4)

Peter
Peter

Reputation: 405

You can map inputs to List<Defered> and then use awaitAll() to wait for all the responses.

This will produce List<Response> without the need for concurent "magic".

suspend fun processData(lstInputs: List<String>): List<Response> {
    return coroutineScope {
        lstInputs.map {
            async(Dispatchers.IO) {
                networkCallToGetData(it)
            }
        }
        .awaitAll()
    }
}

Upvotes: 0

Ryan Kenvin
Ryan Kenvin

Reputation: 206

mutableListOf() creates an ArrayList, which is not thread-safe.
Try using ConcurrentLinkedQueue instead.

Also, do you use the stable version of Kotlin/Kotlinx.coroutine (not the old experimental one)? In the stable version, with the introduction of structured concurrency, there is no need to write jobs.joinAll anymore. launch is an extesion function of runBlocking which will launch new coroutines in the scope of the runBlocking and the runBlocking scope will automatically wait for all the launched jobs to finsish. So the code above can be shorten to

val lstOfReturnData = ConcurrentLinkedQueue<response>()
runBlocking {
        lstInputs.forEach {
            launch(Dispatches.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
        }
}
return lstOfReturnData

Upvotes: 19

Sergio
Sergio

Reputation: 30745

runBlocking blocks current thread interruptibly until its completion. I guess it's not what you want. If I think wrong and you want to block the current thread than you can get rid of coroutine and just make network call in the current thread:

val lstOfReturnData = mutableListOf<response>()
lstInputs.forEach {
    lstOfReturnData.add(networkCallToGetData(it))
} 

But if it is not your intent you can do the following:

class Presenter(private val uiContext: CoroutineContext = Dispatchers.Main) 
    : CoroutineScope {

    // creating local scope for coroutines
    private var job: Job = Job()
    override val coroutineContext: CoroutineContext
        get() = uiContext + job

    // call this to cancel job when you don't need it anymore
    fun detach() {
        job.cancel()
    }

    fun processData(lstInputs: List<String>) {

        launch {
            val deferredList = lstInputs.map { 
                async(Dispatchers.IO) { networkCallToGetData(it) } // runs in parallel in background thread
            }
            val lstOfReturnData = deferredList.awaitAll() // waiting while all requests are finished without blocking the current thread

            // use lstOfReturnData in Main Thread, e.g. update UI
        }
    }
}

Upvotes: 4

TomH
TomH

Reputation: 2729

Runblocking should mean you don't have to call join. Launching a coroutine from inside a runblocking scope should do this for you. Have you tried just:

fun processData(lstInputs: List<String>): List<response> {

val lstOfReturnData = mutableListOf<response>()

runBlocking {
    lstInputs.forEach {
            launch(Dispatchers.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
   } 
}

return lstofReturnData

Upvotes: 0

Related Questions