Tom Grushka
Tom Grushka

Reputation: 549

How do I properly use Kotlin Flow in Ktor streaming responses?

emphasized textI am trying to use Kotlin Flow to process some data asynchronously and in parallel, and stream the responses to the client as they occur, as opposed to waiting until all the jobs are complete.

After unsuccessfully trying to just send the flow itself to the response, like this: call.respond(HttpStatusCode.OK, flow.toList())

... I tinkered for hours trying to figure it out, and came up with the following. Is this correct? It seems there should be a more idiomatic way of sending a Flow<MyData> as a response, like one can with a Flux<MyData> in Spring Boot.

Also, it seems that using the below method does not cancel the Flow when the HTTP request is cancelled, so how would one cancel it in Ktor?

data class MyData(val number: Int)

class MyService {
    fun updateAllJobs(): Flow<MyData> =
        flow {
            buildList { repeat(10) { add(MyData(Random.nextInt())) } }
                // Docs recommend using `onEach` to "delay" elements.
                // However, if I delay here instead of in `map`, all elements are held
                // and emitted at once at the very end of the cumulative delay.
                // .onEach { delay(500) }
                .map {
                    // I want to emit elements in a "stream" as each is computed.
                    delay(500)
                    emit(it)
                }
        }
}

fun Route.jobRouter() {
    val service: MyService by inject() // injected with Koin

    put("/jobs") {
        val flow = service.updateAllJobs()
        // Just using the default Jackson mapper for this example.
        val mapper = jsonMapper { }

        // `respondOutputStream` seems to be the only way to send a Flow as a stream.
        call.respondOutputStream(ContentType.Application.Json, HttpStatusCode.OK) {
            flow.collect {
                println(it)
                // The data does not stream without the newline and `flush()` call.
                write((mapper.writeValueAsString(it) + "\n").toByteArray())
                flush()
            }
        }
    }
}

Upvotes: 2

Views: 2951

Answers (1)

Aleksei Tirman
Aleksei Tirman

Reputation: 6989

The best solution I was able to find (although I don't like it) is to use respondBytesWriter to write data to a response body channel. In the handler, a new job to collect the flow is launched to be able to cancel it if the channel is closed for writing (HTTP request is canceled):

fun Route.jobRouter(service: MyService) {
    put("/jobs") {
        val flow = service.updateAllJobs()
        val mapper = jsonMapper {}

        call.respondBytesWriter(contentType = ContentType.Application.Json) {
            val job = launch {
                flow.collect {
                    println(it)
                    try {
                        writeStringUtf8(mapper.writeValueAsString(it))
                        flush()
                    } catch (_: ChannelWriteException) {
                        cancel()
                    }
                }
            }

            job.join()
        }
    }
}

Upvotes: 2

Related Questions