4face
4face

Reputation: 589

Coroutine: Deferred operations in a List run sequentially.

I have a List of parameters for execute the download. I'm mapping the elements of that list into a Deferred that execute the download; then, forEach element of the List, I call await, but apparently the downloads are executed sequentially.

This is my function:

suspend fun syncFiles() = coroutineScope {
    remoteRepository.requiredFiles()
        .filter { localRepository.needToDownload( it.name, it.md5 ) }
        .map { async { downloader( it ) } }
        .forEach { deferredResult ->
​
            when ( val result = deferredResult.await() ) {
                is DownloadResult.Layout ->  localRepository.storeLayout( result.content )
                is DownloadResult.StringR -> localRepository.storeFile( result )
            }
        }
}

This is my test:

private val useCase = SyncUseCaseImpl.Factory(
        mockk { // downloader
            coEvery { [email protected]( any() ) } coAnswers { delay(1000 );any() }
        },
        ...
    ).newInstance()
​
@Test
fun `syncFiles downloadConcurrently`() = runBlocking {
    val requiredFilesCount = useCase.remoteRepository.requiredFiles().size
    assert( requiredFilesCount ).isEqualTo( 3 )
​
    val time = measureTimeMillis {
        useCase.syncFiles()
    }
​
    assert( time ).isBetween( 1000, 1100 )
}

And this is my result: expected to be between:<1000L> and <1100L> but was:<3081L>

I think is weird, because these 2 dummy tests complete correctly, maybe I am missing something ( ? )

@Test // OK
fun test() = runBlocking {
    val a = async { delay(1000 ) }
    val b = async { delay(1000 ) }
    val c = async { delay(1000 ) } ​
    val time = measureTimeMillis {
        a.await()
        b.await()
        c.await()
    } ​
    assert( time ).isBetween( 1000, 1100 )
} ​

@Test // OK
fun test() = runBlocking {
    val wasteTime: suspend () -> Unit = { delay(1000 ) }
    suspend fun wasteTimeConcurrently() = listOf( wasteTime, wasteTime, wasteTime )
            .map { async { it() } }
            .forEach { it.await() } ​
    val time = measureTimeMillis {
        wasteTimeConcurrently()
    } ​
    assert( time ).isBetween( 1000, 1100 )
}

Upvotes: 1

Views: 2021

Answers (2)

Lamberto Basti
Lamberto Basti

Reputation: 476

This may happens if the job blocks the entire thread, for example IO bound tasks that blocks the entire thread execution thus blocking all the other coroutines on that thread. If you are using Kotlin JVM try calling async(IO) { } to run the couroutine under the IO dispatcher so that the couroutine environment now knows that that job will block the entire thread and behave accordingly.

Have a look here for other dispatchers: https://kotlinlang.org/docs/reference/coroutines/coroutine-context-and-dispatchers.html#dispatchers-and-threads

Upvotes: 1

Rene
Rene

Reputation: 6148

The problem is in mockk

If you look at the code of the coAnswer function you will find this (API.kt + InternalPlatformDsl.kt):

infix fun coAnswers(answer: suspend MockKAnswerScope<T, B>.(Call) -> T) = answers {
    InternalPlatformDsl.runCoroutine {
        answer(it)
    }
}

And runCoroutine looks like this.

actual fun <T> runCoroutine(block: suspend () -> T): T {
    return runBlocking {
        block()
    }
}

As you can see, coAnswer is a non-suspending-function and starts a new coroutine with runBlocking.

Lets see an example:

val mock =  mockk<Downloader> {
    coEvery {
        [email protected]()
    } coAnswers {
        delay(1000)
    }
}

val a = async {
    mock.download()
}

When mockk executes the coAnswer-block (delay()), it starts an artifical coroutine scope, executes the given block and waits (blocking the current thread: runBlocking) until this block has finished. So the answer block only returns after the delay(1000) has finished.

Means all coroutines running from coAnswer are executed sequentially.

Upvotes: 2

Related Questions