Shadow
Shadow

Reputation: 4743

Collect flow but only any new values, not the currently existing value

Currently struggling with this one, and so far no combination of SharedFlow and StateFlow have worked.

I have a flow that might have already started with a value, or not.

Using that flow I want to collect any new values that are emitted after I start collecting.

At this moment all my attempts have always failed, no matter what I try it always gets the current value as soon as I start collecting.

An example of what I am trying to achieve:

I want to be able to do the following:

Tried SharedFlow and Stateflow, tried with replay = 0 and WhileSubscribed, no combination I could find would do what I am looking for.

The only workaround so far that I found was to locally register the time I start my .collect{ } and compare with the start time of the item I receive in the collect. In this case I have the object I am using has a specific origin time, but this workaround will not work for everything like the example above with Integers.

EDIT: Adding implementation example as requested for SharedFlow

This is tied to a Room database call that returns a Flow<MyObject>

MyFragment.kt

lifecycleScope.launch(Dispatchers.IO) {
    viewModel.getMyObjectFlow.shareIn(
        viewModel.viewModelScope, // also tried with fragment lifecyclescope
        SharingStarted.WhileSubscribed(), // also tried with the other 2 options
        replay = 0,
    ).collect{
        ...
    }
}

Upvotes: 1

Views: 7594

Answers (2)

broot
broot

Reputation: 28312

You have a misconception of how flows work. They always emit only after you start collecting. They emit on-demand. Let's get this example:

val flow1 = flow {
    println("Emitting 1")
    emit(1)
    delay(10.seconds)
    println("Emitting 2")
    emit(2)
}

delay(5.seconds)

println("Start collecting")
flow1.collect {
    println("Collected: $it")
}

The output is:

Start collecting
Emitting 1
Collected: 1

not:

Emitting 1
Start collecting
Collected: 1

This is because flow starts emitting only after you start collecting it. Otherwise, it would have nowhere to emit.

Of course, there are flows which emit from some kind of a cache, queue or a buffer. For example shared flows do this. In that case it looks like you collect after emitting. But this is not really the case. Technically speaking, it works like this:

val buffer = listOf(1 , 2, 3)
val flow1 = flow {
    buffer.forEach {
        println("Emitting $it")
        emit(it)
    }
}

It still emits after you start collecting, but it just emits from the cache. Of course, the item was added to the cache before you started collecting, but this is entirely abstracted from you. You can't know why a flow emitted an item. From the collector perspective it always emitted just now, not in the past. Similarly, you can't know if a webserver read the data from the DB or a cache - this is abstracted from you.

Summing up: it is not possible to collect only new items from just any flow in an universal way. Flows in general don't understand the concept of "new items". They just emit, but you don't know why they do this. Maybe they somehow generate items on-the-fly, maybe they passively observe external events or maybe they re-transmit some items that they collected from another flow. You don't know that.

While developing your solution, you need to understand what was the source of items and develop your code accordingly. For example, if the source is a regular cold flow, then it never starts doing anything before you start collecting. If the source is a state flow, you can just drop the first item. If it is a shared flow or a flow with some replay buffer, then the situation is more complicated.

One possible approach would be to start collecting earlier than we need, initially ignore all collected items and at some point in time start processing them. But this is still far from perfect and it may not work as we expect.

Upvotes: 3

Tenfour04
Tenfour04

Reputation: 93581

It doesn't make sense to use shareIn at the use site like that. You're creating a shared Flow that cannot be shared because you don't store the reference for other classes to access and use.

Anyway, the problem is that you are creating the SharedFlow at the use site, so your shared flow only begins collecting from upstream when the fragment calls this code. If the upstream flow is cold, then you will be getting the first value emitted by the cold flow.

The SharedFlow should be created in the ViewModel and put in a property so each Fragment can collect from the same instance. You'll want to use SharingStarted.Eagerly to prevent the cold upstream flow from restarting from the beginning when there are new subscribers after a break.

Upvotes: 0

Related Questions