Reputation: 589
I have a List
of parameters for execute the download.
I'm mapping the elements of that list into a Deferred
that execute the download; then, forEach
element of the List, I call await
, but apparently the downloads are executed sequentially.
This is my function:
suspend fun syncFiles() = coroutineScope {
remoteRepository.requiredFiles()
.filter { localRepository.needToDownload( it.name, it.md5 ) }
.map { async { downloader( it ) } }
.forEach { deferredResult ->
when ( val result = deferredResult.await() ) {
is DownloadResult.Layout -> localRepository.storeLayout( result.content )
is DownloadResult.StringR -> localRepository.storeFile( result )
}
}
}
This is my test:
private val useCase = SyncUseCaseImpl.Factory(
mockk { // downloader
coEvery { [email protected]( any() ) } coAnswers { delay(1000 );any() }
},
...
).newInstance()
@Test
fun `syncFiles downloadConcurrently`() = runBlocking {
val requiredFilesCount = useCase.remoteRepository.requiredFiles().size
assert( requiredFilesCount ).isEqualTo( 3 )
val time = measureTimeMillis {
useCase.syncFiles()
}
assert( time ).isBetween( 1000, 1100 )
}
And this is my result: expected to be between:<1000L> and <1100L> but was:<3081L>
I think is weird, because these 2 dummy tests complete correctly, maybe I am missing something ( ? )
@Test // OK
fun test() = runBlocking {
val a = async { delay(1000 ) }
val b = async { delay(1000 ) }
val c = async { delay(1000 ) }
val time = measureTimeMillis {
a.await()
b.await()
c.await()
}
assert( time ).isBetween( 1000, 1100 )
}
@Test // OK
fun test() = runBlocking {
val wasteTime: suspend () -> Unit = { delay(1000 ) }
suspend fun wasteTimeConcurrently() = listOf( wasteTime, wasteTime, wasteTime )
.map { async { it() } }
.forEach { it.await() }
val time = measureTimeMillis {
wasteTimeConcurrently()
}
assert( time ).isBetween( 1000, 1100 )
}
Upvotes: 1
Views: 2021
Reputation: 476
This may happens if the job blocks the entire thread, for example IO bound tasks that blocks the entire thread execution thus blocking all the other coroutines on that thread. If you are using Kotlin JVM try calling async(IO) { }
to run the couroutine under the IO dispatcher so that the couroutine environment now knows that that job will block the entire thread and behave accordingly.
Have a look here for other dispatchers: https://kotlinlang.org/docs/reference/coroutines/coroutine-context-and-dispatchers.html#dispatchers-and-threads
Upvotes: 1
Reputation: 6148
The problem is in mockk
If you look at the code of the coAnswer
function you will find this (API.kt + InternalPlatformDsl.kt):
infix fun coAnswers(answer: suspend MockKAnswerScope<T, B>.(Call) -> T) = answers {
InternalPlatformDsl.runCoroutine {
answer(it)
}
}
And runCoroutine
looks like this.
actual fun <T> runCoroutine(block: suspend () -> T): T {
return runBlocking {
block()
}
}
As you can see, coAnswer
is a non-suspending-function and starts a new coroutine with runBlocking
.
Lets see an example:
val mock = mockk<Downloader> {
coEvery {
[email protected]()
} coAnswers {
delay(1000)
}
}
val a = async {
mock.download()
}
When mockk
executes the coAnswer
-block (delay()
), it starts an artifical coroutine scope, executes the given block and waits (blocking the current thread: runBlocking
) until this block has finished. So the answer block only returns after the delay(1000)
has finished.
Means all coroutines running from coAnswer
are executed sequentially.
Upvotes: 2