Citut
Citut

Reputation: 907

Kotlin - How To Collect X Values From a Flow?

Let's say I have a flow that is constantly sending updated like the following:

locationFlow = StateFlow<Location?>(null)

I have a use-case where after a particular event occurs, I want to collect X values from the flow and continue, so something like what I have below. I know that collect is a terminal operator, so I don't think the logic I have below works, but how could I do this in this case? I'd like to collect X items, save them, and then send them to another function for processing/handling.

fun onEventOccurred() {
     launch {
         val locations = mutableListOf<Location?>()
         locationFlow.collect {
               //collect only X locations
               locations.add(it)
         }
         saveLocations(locations)
     }
}

Is there a pre-existing Kotlin function for something like this? I'd like to collect from the flow X times, save the items to a list, and pass that list to another function.

Upvotes: 1

Views: 1112

Answers (2)

Tenfour04
Tenfour04

Reputation: 93561

It doesn't matter that collect is terminal. The upstream StateFlow will keep behaving normally because StateFlows don't care what their collectors are doing. you can use the take function to get a specific number of items, and you can use toList() (another terminal function) to concisely copy them into a list once they're all ready.

fun onEventOccurred() {
     launch {
         saveLocations(locationFlow.take(5).toList())
     }
}

Upvotes: 3

user2340612
user2340612

Reputation: 10704

If I understood correctly your use case, you want to:

  • discard elements until a specific one is sent – actually, after re-reading your question I don't think this is the case.. I'm leaving it in the example just FYI
  • when that happens, you want to collect X items for further processing

Assuming that's correct, you can use a combination of dropWhile and take, like so:

fun main() = runBlocking {
    val messages = flow {
        repeat(10) {
            println(it)
            delay(500)
            emit(it)
        }
    }

    messages
        .dropWhile { it < 5 }
        .take(3)
        .collect { println(it) } // prints 5, 6, 7
}

You can even have more complex logic, i.e. discard any number that's less than 5, and then take the first 10 even numbers:

fun main() = runBlocking {
    val messages = flow {
        repeat(100) {
            delay(500)
            emit(it)
        }
    }

    messages
        .dropWhile { it < 5 }
        .filter { it % 2 == 0}
        .take(10)
        .collect { println(it) } // prints even numbers, 6 to 24
}

Upvotes: 2

Related Questions