Reputation: 523
I have a server that relays between two (different) clients. When the User (first client, through websockets) sends a message, the server needs to repeat this message every X milliseconds to the Device (second client) until a new message is received, or the websocket is closed.
I consume the websocket as a flow, and I've created the following operator:
fun <T> flowEvery(value: T, everMilliSeconds: Long): Flow<T> =
flow {
while (true) {
emit(value)
delay(everMilliSeconds)
}
}.cancellable()
@ExperimentalCoroutinesApi
fun <T> Flow<T>.repeatEvery(mSec: Long): Flow<T> =
this.flatMapLatest {
flowEvery(it, mSec)
}
Problem is, once the socket is closed the last message is kept on being sent for ever.
My call site is:
try {
oscConnections.sendTo(
deviceIdentifier,
incoming.consumeAsFlow().repeatEvery(50).mapNotNull { frame ->
when (frame) {
is Frame.Text -> listOf(frame.readText().toFloat())
else -> null
}
})
} finally {
close(CloseReason(CloseReason.Codes.NORMAL, "Ended"))
}
the incoming
channel is closed (onCompletion
is called) but the stream sent to sendTo
is not. sendTo
it self consumes the input stream and send a UDP message for every element it consumes.
How can I force the flow to stop?
Upvotes: 1
Views: 2189
Reputation: 4676
By using flatMapLatest
or transformLatest
you replace the last value of the upstream Flow with a never-ending Flow.
You must stop that Flow somehow and CancellationException
s are used everywhere in coroutines to signal the cancellation of coroutines. You can wrap your never-ending Flow logic in a coroutineScope
to precisely cancel only that scope once the upstream flow has completed.
fun <T> Flow<T>.repeatEvery(delay: Long): Flow<T> =
flow<T> {
try {
coroutineScope {
onCompletion { [email protected]() }
.transformLatest { value ->
while (true) {
emit(value)
delay(delay)
}
}
.collect(::emit)
}
}
catch (e: CancellationException) {
// done
}
}
PS: .cancellable()
doesn't do much in your example. As per documentation Flows built using flow builders like flow { … }
are automatically cancellable.
Upvotes: 2