Reputation: 549
emphasized textI am trying to use Kotlin Flow to process some data asynchronously and in parallel, and stream the responses to the client as they occur, as opposed to waiting until all the jobs are complete.
After unsuccessfully trying to just send the flow
itself to the response, like this: call.respond(HttpStatusCode.OK, flow.toList())
... I tinkered for hours trying to figure it out, and came up with the following. Is this correct? It seems there should be a more idiomatic way of sending a Flow<MyData>
as a response, like one can with a Flux<MyData>
in Spring Boot.
Also, it seems that using the below method does not cancel the Flow when the HTTP request is cancelled, so how would one cancel it in Ktor?
data class MyData(val number: Int)
class MyService {
fun updateAllJobs(): Flow<MyData> =
flow {
buildList { repeat(10) { add(MyData(Random.nextInt())) } }
// Docs recommend using `onEach` to "delay" elements.
// However, if I delay here instead of in `map`, all elements are held
// and emitted at once at the very end of the cumulative delay.
// .onEach { delay(500) }
.map {
// I want to emit elements in a "stream" as each is computed.
delay(500)
emit(it)
}
}
}
fun Route.jobRouter() {
val service: MyService by inject() // injected with Koin
put("/jobs") {
val flow = service.updateAllJobs()
// Just using the default Jackson mapper for this example.
val mapper = jsonMapper { }
// `respondOutputStream` seems to be the only way to send a Flow as a stream.
call.respondOutputStream(ContentType.Application.Json, HttpStatusCode.OK) {
flow.collect {
println(it)
// The data does not stream without the newline and `flush()` call.
write((mapper.writeValueAsString(it) + "\n").toByteArray())
flush()
}
}
}
}
Upvotes: 2
Views: 2951
Reputation: 6989
The best solution I was able to find (although I don't like it) is to use respondBytesWriter
to write data to a response body channel. In the handler, a new job to collect the flow is launched to be able to cancel it if the channel is closed for writing (HTTP request is canceled):
fun Route.jobRouter(service: MyService) {
put("/jobs") {
val flow = service.updateAllJobs()
val mapper = jsonMapper {}
call.respondBytesWriter(contentType = ContentType.Application.Json) {
val job = launch {
flow.collect {
println(it)
try {
writeStringUtf8(mapper.writeValueAsString(it))
flush()
} catch (_: ChannelWriteException) {
cancel()
}
}
}
job.join()
}
}
}
Upvotes: 2