Luftzig
Luftzig

Reputation: 523

Stopping an infinite flow

I have a server that relays between two (different) clients. When the User (first client, through websockets) sends a message, the server needs to repeat this message every X milliseconds to the Device (second client) until a new message is received, or the websocket is closed.

I consume the websocket as a flow, and I've created the following operator:

fun <T> flowEvery(value: T, everMilliSeconds: Long): Flow<T> =
    flow {
        while (true) {
            emit(value)
            delay(everMilliSeconds)
        }
    }.cancellable()

@ExperimentalCoroutinesApi
fun <T> Flow<T>.repeatEvery(mSec: Long): Flow<T> =
    this.flatMapLatest {
        flowEvery(it, mSec)
    }

Problem is, once the socket is closed the last message is kept on being sent for ever.

My call site is:

try {
    oscConnections.sendTo(
        deviceIdentifier,
        incoming.consumeAsFlow().repeatEvery(50).mapNotNull { frame ->
            when (frame) {
                is Frame.Text -> listOf(frame.readText().toFloat())
                else -> null
            }
        })
} finally {
    close(CloseReason(CloseReason.Codes.NORMAL, "Ended"))
}

the incoming channel is closed (onCompletion is called) but the stream sent to sendTo is not. sendTo it self consumes the input stream and send a UDP message for every element it consumes.

How can I force the flow to stop?

Upvotes: 1

Views: 2189

Answers (1)

fluidsonic
fluidsonic

Reputation: 4676

By using flatMapLatest or transformLatest you replace the last value of the upstream Flow with a never-ending Flow.

You must stop that Flow somehow and CancellationExceptions are used everywhere in coroutines to signal the cancellation of coroutines. You can wrap your never-ending Flow logic in a coroutineScope to precisely cancel only that scope once the upstream flow has completed.

fun <T> Flow<T>.repeatEvery(delay: Long): Flow<T> =
    flow<T> {
        try {
            coroutineScope {
                onCompletion { [email protected]() }
                    .transformLatest { value ->
                        while (true) {
                            emit(value)
                            delay(delay)
                        }
                    }
                    .collect(::emit)
            }
        }
        catch (e: CancellationException) {
            // done
        }
    }

PS: .cancellable() doesn't do much in your example. As per documentation Flows built using flow builders like flow { … } are automatically cancellable.

Upvotes: 2

Related Questions