Reputation: 6203
I have this simple test about a crawler that is supposed to call the repo 40 times:
@Test
fun testX() {
// ...
runBlocking {
crawlYelp.concurrentCrawl()
// Thread.sleep(5000) // works if I un-comment
}
verify(restaurantsRepository, times(40)).saveAll(restaurants)
// ...
}
and this implementation:
suspend fun concurrentCrawl() {
cities.map { loc ->
1.rangeTo(10).map { start ->
GlobalScope.async {
val rests = scrapYelp.scrap(loc, start * 10)
restaurantsRepository.saveAll(rests)
}
}
}
}
But... I get this:
Wanted 40 times:
-> at ....testConcurrentCrawl(CrawlYelpTest.kt:46)
But was 30 times:
(the 30 is changing all the time; so it seems the test is not waiting...)
Why does it pass when I do the sleep? It should not be needed given I run blocking..
BTW, I have a controller that is supposed to be kept asynchronous:
@PostMapping("crawl")
suspend fun crawl(): String {
crawlYelp.concurrentCrawl()
return "crawling" // this is supposed to be returned right away
}
Thanks
Upvotes: 3
Views: 2623
Reputation: 754
You could also just use MockK for this (and so much more).
MockK's verify
has a timeout : Long
parameter specifically for handling these races in tests.
You could leave your production code as it is, and change your test to this:
import io.mockk.verify
@Test
fun `test X`() = runBlocking {
// ...
crawlYelp.concurrentCrawl()
verify(exactly = 40, timeout = 5000L) {
restaurantsRepository.saveAll(restaurants)
}
// ...
}
If the verify is successful at any point before 5 seconds, it'll pass and move on. Else, the verify (and test) will fail.
Upvotes: 3
Reputation: 5302
runBlocking
waits for all suspend functions to finish, but as concurrentCrawl
basically just starts new jobs in new threads with GlobalScope.async
currentCrawl
, and therefore runBlocking
, is done after all jobs were started and not after all of this jobs have finished.
You have to wait for all jobs started with GlobalScope.async
to finish like this:
suspend fun concurrentCrawl() {
cities.map { loc ->
1.rangeTo(10).map { start ->
GlobalScope.async {
val rests = scrapYelp.scrap(loc, start * 10)
restaurantsRepository.saveAll(rests)
}
}.awaitAll()
}
}
If you want to wait for concurrentCrawl()
to finish outside of concurrentCrawl()
then you have to pass the Deferred
results to the calling function like in the following example. In that case the suspend
keyword can be removed from concurrentCrawl()
.
fun concurrentCrawl(): List<Deferred<Unit>> {
return cities.map { loc ->
1.rangeTo(10).map { start ->
GlobalScope.async {
println("hallo world $start")
}
}
}.flatten()
}
runBlocking {
concurrentCrawl().awaitAll()
}
As mentioned in the comments: In this case the async
method does not return any value so it is better to use launch instead:
fun concurrentCrawl(): List<Job> {
return cities.map { loc ->
1.rangeTo(10).map { start ->
GlobalScope.launch {
println("hallo world $start")
}
}
}.flatten()
}
runBlocking {
concurrentCrawl().joinAll()
}
Upvotes: 3