Luís Soares
Luís Soares

Reputation: 6203

Testing coroutines in Kotlin

I have this simple test about a crawler that is supposed to call the repo 40 times:

@Test
fun testX() {
   // ... 
   runBlocking {
        crawlYelp.concurrentCrawl()
        // Thread.sleep(5000) // works if I un-comment
   }
   verify(restaurantsRepository, times(40)).saveAll(restaurants)
   // ...
}

and this implementation:

suspend fun concurrentCrawl() {
    cities.map { loc ->
        1.rangeTo(10).map { start ->
            GlobalScope.async {
                val rests = scrapYelp.scrap(loc, start * 10)
                restaurantsRepository.saveAll(rests)
            }
        }
    }
}

But... I get this:

Wanted 40 times:
-> at ....testConcurrentCrawl(CrawlYelpTest.kt:46)
But was 30 times:

(the 30 is changing all the time; so it seems the test is not waiting...)

Why does it pass when I do the sleep? It should not be needed given I run blocking..

BTW, I have a controller that is supposed to be kept asynchronous:

@PostMapping("crawl")
suspend fun crawl(): String {
    crawlYelp.concurrentCrawl()
    return "crawling" // this is supposed to be returned right away
}

Thanks

Upvotes: 3

Views: 2623

Answers (2)

RBusarow
RBusarow

Reputation: 754

You could also just use MockK for this (and so much more).

MockK's verify has a timeout : Long parameter specifically for handling these races in tests.

You could leave your production code as it is, and change your test to this:

import io.mockk.verify

@Test
fun `test X`() = runBlocking {
   // ... 

   crawlYelp.concurrentCrawl()

   verify(exactly = 40, timeout = 5000L) {
      restaurantsRepository.saveAll(restaurants)
   }
   // ...
}

If the verify is successful at any point before 5 seconds, it'll pass and move on. Else, the verify (and test) will fail.

Upvotes: 3

Alexander Egger
Alexander Egger

Reputation: 5302

runBlocking waits for all suspend functions to finish, but as concurrentCrawl basically just starts new jobs in new threads with GlobalScope.async currentCrawl, and therefore runBlocking, is done after all jobs were started and not after all of this jobs have finished.

You have to wait for all jobs started with GlobalScope.async to finish like this:

suspend fun concurrentCrawl() {
    cities.map { loc ->
        1.rangeTo(10).map { start ->
            GlobalScope.async {
                val rests = scrapYelp.scrap(loc, start * 10)
                restaurantsRepository.saveAll(rests)
            }
        }.awaitAll()
    }
}

If you want to wait for concurrentCrawl() to finish outside of concurrentCrawl() then you have to pass the Deferred results to the calling function like in the following example. In that case the suspend keyword can be removed from concurrentCrawl().

fun concurrentCrawl(): List<Deferred<Unit>> {
    return cities.map { loc ->
        1.rangeTo(10).map { start ->
            GlobalScope.async {
                println("hallo world $start")
            }
        }
    }.flatten()
}


runBlocking {
    concurrentCrawl().awaitAll()
}

As mentioned in the comments: In this case the async method does not return any value so it is better to use launch instead:

fun concurrentCrawl(): List<Job> {
    return cities.map { loc ->
        1.rangeTo(10).map { start ->
            GlobalScope.launch {
                println("hallo world $start")
            }
        }
    }.flatten()
}

runBlocking {
    concurrentCrawl().joinAll()
}

Upvotes: 3

Related Questions