Hantsy
Hantsy

Reputation: 9261

How to return a Kotlin Coroutines Flow in Spring reactive WebClient

Spring 5.2 brought Kotlin coroutines support, Spring reactive WebClient has got Coroutines supports in Kotlin extensions.

I have created backend service which exposes GET /posts as a Flow, check the codes here.

@GetMapping("")
fun findAll(): Flow<Post> =
        postRepository.findAll()

In the client sample, I tried to use WebClient to consume this api by the following.

@GetMapping("")
suspend fun findAll(): Flow<Post> =
        client.get()
                .uri("/posts")
                .accept(MediaType.APPLICATION_JSON)
                .awaitExchange()
                .awaitBody()

It failed due to a Jackson serialization of the Flow type.

Due to awaitXXX method in the above expression, I have to use a suspend modifier for this fun.

But the following is working if I changed the body type to Any, check the compelete codes.

GetMapping("")
suspend fun findAll() =
        client.get()
                .uri("/posts")
                .accept(MediaType.APPLICATION_JSON)
                .awaitExchange()
                .awaitBody<Any>()

After read the Kotlin Coroutines of spring ref doc, the Flux should be converted to Kotlin coroutines Flow. How to deal with return type to a Flow and remove suspend here?

Update: The return type is changed to Flow, check the latest source codes here, I think it could be part of Spring 5.2.0.M2. The suspend modifier is required for 2-stage coroutines operations in webclient api, as explained below by Sébastien Deleuze.

Upvotes: 6

Views: 6457

Answers (1)

S&#233;bastien Deleuze
S&#233;bastien Deleuze

Reputation: 6209

The first thing to understand is that returning Flow does not require using suspending functions for the handler method itself. With Flow, the suspending functions are usually isolated in lambda parameters. But in this (common) use case, due to WebClient 2 stage API (first get the response, then get the body) we need the handler method to be suspending for awaitExchange and then get the body as Flow with bodyToFlow extension:

@GetMapping("")
suspend fun findAll() =
    client.get()
        .uri("/posts")
        .accept(MediaType.APPLICATION_JSON)
        .awaitExchange()
        .bodyToFlow<Post>()

This is supported as of Spring Framework 5.2 M2 and Spring Boot 2.2 M3 (see related issue). See also my related detailed blog post.

Upvotes: 7

Related Questions