Sergey Nikulitsa
Sergey Nikulitsa

Reputation: 113

Async/await kotlin coroutines from blocking code

I'm use Spring boot without reactive Web.

I try to run some async request with Kotlin coroutines

    @GetMapping
    fun test(): Message {
        val restTemplate = RestTemplate()
        return runBlocking {
            val hello = async { hello(restTemplate) }
            val world = async { world(restTemplate) }
            Message("${hello.await()} ${world.await()}!")
        }
    }

    private suspend fun world(restTemplate: RestTemplate): String {
        logger.info("Getting WORLD")
        return restTemplate.getForEntity("http://localhost:9090/world", World::class.java).body!!.payload
    }

    private suspend fun hello(restTemplate: RestTemplate): String {
        logger.info("Getting HELLO")
        return restTemplate.getForEntity("http://localhost:9090/hello", Hello::class.java).body!!.payload
    }

But this code runs sequentially.

How can I fix it?

Upvotes: 4

Views: 5585

Answers (4)

Onema
Onema

Reputation: 7582

TL;DR Use async with a custom Dispatcher such as Dispatchers.IO designed to offload blocking IO.

val hello = async(Dispatchers.IO) { hello(restTemplate) }
val world = async(Dispatchers.IO) { world(restTemplate) }

Update: I was informed in the Kotlin coroutines slack channel, that I can use async(Dispatchers.IO) rather than using async + withContext(Dispatchers.IO).

I took @Sergey Nikulitsa code and created an extension function that takes a lambda with a receiver (similar to async) to combine both async and withContext(Dispatches.IO).

import kotlinx.coroutines.*

fun <T> CoroutineScope.myRestTemplateAsync(
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {

    return async(Dispatchers.IO, start) {
        block() 
    }
}

Then it can be used in your code like so:


@GetMapping
fun test(): Message {
    val restTemplate = RestTemplate()
    return runBlocking {
        val hello = myRestTemplateAsync { hello(restTemplate) }
        val world = myRestTemplateAsync { world(restTemplate) }
        Message("${hello.await()} ${world.await()}!")
    }
}

private suspend fun world(restTemplate: RestTemplate): String {
    logger.info("Getting WORLD")
    return restTemplate.getForEntity("http://localhost:9090/world", World::class.java).body!!.payload
}

private suspend fun hello(restTemplate: RestTemplate): String {
    logger.info("Getting HELLO")
    return restTemplate.getForEntity("http://localhost:9090/hello", Hello::class.java).body!!.payload
} 

Preliminary results

At this point, I am just experimenting with this method and I am using Spring WebMVC only and RestTemplate for 5+ calls.

The myRestTemplateAsync extension function has consistently reduced execution times between 30% and 50% compared to its synchronous counterpart.

Why this works over just using async { }?

For the RestTemplate specifically, using async {...} within the coroutineScope didn't seem to make a difference, and executions times were at par with the synchronous code.

Furthermore, looking at the threads in the profiler, there was no "Dispatcher Workers" created when using async by itself. This leads me to believe that the thread-per-request model of the RestTemplate was blocking the entire thread.

When a new dispatcher is specified in async, it shifts the execution of the coroutine (and the function block) to a new thread in the Dispatchers.IO thread pool.

In this case, the code-block should contain the RestTemplate call (a single call). As far as I can tell, this prevents the RestTemplate from blocking the original context.

Why you may want to use this method?

If you have been using RestTemplate (thread-per-request model) in a large project, it could be a difficult task to just replace it with a non-blocking client like WebClient. With this, you can continue to use most of your code, and simply add the myRestTemplateAsync in areas of your code where you could make multiple calls asynchronously.

If you are starting a new project don't use RestTemplate. Instead is best to use WebFlux with coroutines in Kotlin as explained in this article.

Is this a good idea?

At this time, I don't have enough information to say one way or the other. I hope to run more extensive tests and evaluate:

  • Memory consumption under load
  • Possible thread pool exhaustion under load
  • How are exceptions propagated and handled

If you have any comments on why this may or may not be a good idea, please post them below.

Upvotes: 5

Sergey Nikulitsa
Sergey Nikulitsa

Reputation: 113

That code is work in parallel:

    @GetMapping
    fun test(): Message {
        val restTemplate = RestTemplate()
        return runBlocking {
            val hello = async { hello(restTemplate) }
            val world = async { world(restTemplate) }
            Message("${hello.await()} ${world.await()}!")
        }
    }

    private suspend fun world(restTemplate: RestTemplate): String {
        logger.info("Getting WORLD")
        return withContext(Dispatchers.IO) {
            restTemplate.getForEntity("http://localhost:9090/world", World::class.java).body!!.payload
        }
    }

    private suspend fun hello(restTemplate: RestTemplate): String {
        logger.info("Getting HELLO")
        return withContext(Dispatchers.IO) {
            restTemplate.getForEntity("http://localhost:9090/hello", Hello::class.java).body!!.payload
        }
    }

Upvotes: 4

Sergey Nikulitsa
Sergey Nikulitsa

Reputation: 113

Maybe root cause is:

  • restTemplate use java.io (not java.nio)
  • restTemplate block current thread until it gets HTTP response
  • coroutines magic don't work in this case

Solution:

  • Using http-client that use java.nio

Upvotes: 2

  • runBlocking: It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.

  • Here we create a CoroutineScope by using coroutineScope method. This function is designed for parallel decomposition of work. When any child coroutine in this scope fails, this scope fails and all the rest of the children are cancelled.

  • Because coroutineScope is suspend function, so we mark fun test() as suspend fun (Suspend functions are only allowed to be called from a coroutine or another suspend function). By using CoroutineScope object, we can call async and launch to start a coroutine

  @GetMapping
  suspend fun test(): Message {
        val restTemplate = RestTemplate()
        return coroutineScope {
            val hello = async { hello(restTemplate) }
            val world = async { world(restTemplate) }
            Message("${hello.await()} ${world.await()}!")
        }
    }

Upvotes: 0

Related Questions