Raman
Raman

Reputation: 19565

Flow that emits the last value periodically, and when a new value arrives

I want to create a Kotlin coroutines Flow that emits values when

  1. they change, and
  2. periodically emits the last value available, every x duration since the last change or last emit.

Upvotes: 4

Views: 5742

Answers (3)

Mahozad
Mahozad

Reputation: 24462

Here is another variation of Raman's answer:

@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.emitLatestEvery(
    duration: Duration,
    onChange: suspend FlowCollector<T?>.() -> Unit = {}
) = transformLatest {
    onChange()
    while (true) {
        emit(it)
        delay(duration)
    }
}

An example usage:

val ipAddresses: Flow<String>
ipAddresses
        .emitLatestEvery(5.seconds, onChange = { emit(null) })
        .mapLatest { pingIp(it) }
        .distinctUntilChanged()

Upvotes: 1

Brian Yencho
Brian Yencho

Reputation: 2958

You could create a Flow that emits at a regular interval and then just use combine. Each time you'd combine the values, you'd really just be passing along the current value of the original Flow you are interested in.

    // This is the main flow you are interested in. This uses
    // a Flow builder just as a simple example but this could
    // be any kind of Flow, like a (Mutable)StateFlow.
    val emitter = flow {
        emit("Your data")
        // ...
    }
    // This just serves as a timer.
    val timer = flow {
        while (currentCoroutineContext().isActive) {
            emit(Unit)
            delay(500)
        }
    }
    // This will emit whenever either of the Flows emits and 
    // continues to do so until "emitter" stops emitting.
    combine(
        emitter,
        timer
    ) {  value, ticker ->
        // Always just return the value of your
        // main Flow.
        value
    }

Upvotes: 3

Raman
Raman

Reputation: 19565

This seems to work -- every time a new value arrives, transformLatest cancels any previous lambdas and starts a new one. So this approach emits, and then continues to emit periodically until a new value arrives.

flow.transformLatest { value ->
  while(currentCoroutineContext().isActive) {
    emit(value)
    delay(x)
  }
}

Upvotes: 4

Related Questions