self_out_ manoeuvred
self_out_ manoeuvred

Reputation: 21

How to build a flow that doesn't restart between collections

How to build a flow that doesn't restart/reset between terminal operation (eg: collect):

fun <T> Flow<T>.asResumableFlow(): Flow<T> = ???

// Usage
val flow = flowOf(1, 2).asResumableFlow()
flow.first() // 1
flow.first() // 2, not 1
flow.firstOrNull() // null

I have a flow must be collected multiple times because it is collected in a loop over another logic:

val resumableFlow: Flow<Expensive> = ...

while (condition) {
  if (mainTaskSucceded) continue
  fallback(resumableFlow.first()) // Only produce Expensive when needed
}

In most case no fallback will be needed, so it is important that no value is computed until actually needed.

Constraints:

Non constraints:

I haven't found how Channel, StateFlow and MutableFlow could avoid violating those constraints, especially about not computing a value until needed.

They all have a push model, where my problem is better expressed as a pull. Cold flow match the pull model (started on demand), except for the reset on restart.

My current workaround is to save the flow state in an persistent object. When the flow is restarted, it resume where it was left:

fun resumableFlow(): Flow<Expensive> {
  val state = State()
  return flow {
     while (state.hasNext()) {
       emit(state.next())
     }
  }
}

This is far from ideal as the state machine is manual, the opposite of what coroutines normally allow.

Here is a draft asResumableFlow:

public fun <T> Flow<T>.asResumableFlow(
    context: CoroutineContext = EmptyCoroutineContext
): Flow<T> = ResumableFlow(this, context)

public class ResumableFlow<T>(flow: Flow<T>, context: CoroutineContext) : Flow<T> {
    /** Resume to produce a new value from `flow`. `null` when `flow` ended. */
    private var collect: Continuation<Unit>? = createCollectCoroutine(flow, context)

    /** Set each time `flow` produces a value. `null` when `flow` ended. */
    private var value: T? = null

    /** Set if `flow` ended in failure, `null` otherwise. */
    private var throwable: Throwable? = null

    private fun createCollectCoroutine(flow: Flow<T>, context: CoroutineContext): Continuation<Unit> {
        val finishContinuation = Continuation<Unit>(context) {
            collect = null
            value = null
            throwable = it.exceptionOrNull()
        }
        return suspend { suspendCollect(flow) }.createCoroutine(finishContinuation)
    }

    /** Collect [flow], suspending after each value is received. */
    private suspend fun suspendCollect(flow: Flow<T>) {
        flow.collect { value ->
            this.value = value
            suspendCoroutine {
                collect = it
            }
        }
    }

    private fun resumeCollect(): Boolean {
        collect?.resume(Unit)
        return collect != null
    }

    override suspend fun collect(collector: FlowCollector<T>) {
        flow<T> {
            while (resumeCollect()) {
                emit(value!!)
            }
            throwable?.let { throw it }
        }.collect(collector)
    }
}

It works but it feels hacky given its low level.

Upvotes: 0

Views: 230

Answers (2)

BladeCoder
BladeCoder

Reputation: 12949

If you want the consumer to drive the production of the generated values, then you should probably use a Sequence and not a Flow. See: sequence() function documentation.

Upvotes: 0

tyg
tyg

Reputation: 15763

There are two types of flows: cold flows and hot flows. Regular flows generated by flowOf(), for example, are cold. That means they are only activated when collect (or any other terminal operator) is applied. That also means that the flow is recreated from scratch each time, so each collector receives values independent of other collectors.

Hot flows on the other hand run independent of the collectors. That means when a new collector subscribes to the flow, it receives the value that is already there and continues with the values that are emitted from then on. That also means that multiple collectors receive the same values, they share the flow. Hot flows are usually of type SharedFlow or are of a subtype like StateFlow.

You can create such a hot flow with MutableSharedFlow() or MutableStateFlow(). You can also easily turn an already existing cold flow into a hot flow by calling shareIn() respectively stateIn(). There are several ways to configure a hot flow to pause when there are no collectors, to buffer a certain amount of values, to replay old values to new collectors, to skip same values and so on.

Most use cases can be realized by a properly configured shared flow. One thing to note, though: Flows decide themselves when to generate new values, even cold flows. You can collect those values if you are interested in them, but flows are not designed to only produce values one after another, when a collector requests the next one. So your example with calling first() twice (as in, give me the first value, then give me the second value) isn't how flows behave in general. You usually would have a collector with a lambda instead that is called each time a new value is available. When and where you process the values is then up to you.

If you really need to request one value at a time you should use a Channel instead of a flow. Channels provide a more primitive, low-level API that flows use themselves under the hood.

You can read more about the differences of Channels and Flows and when to use what here: https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c

Upvotes: 1

Related Questions