Reputation: 10493
I have a Flow<List<Int?>>
and I want to collect this flow but only until I get a null Int. Then the flow should get cancelled. For example,
val flow = flowOf(
listOf(1, 2),
listOf(3, null),
listOf(4)
)
flow.collectUntilNull {
println(it)
}
The output that I want is:
[1, 2]
[3, null]
I know there is a function Flow.takeWhile
but it doesn't emit the value where predicate returns false. In my case I want that one also.
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
return@flow collectWhile { value ->
if (predicate(value)) {
emit(value)
true
} else {
// I want a "emit(value)" here
false
}
}
}
Since collectWhile
is internal I can't use this code. Although I guess I can copy paste that collectWhile
implementation in my code. But is there another way to do this?
Upvotes: 4
Views: 8014
Reputation: 1
I solved this way:
fun <R> Flow<R>.takeUntil(
transform: suspend FlowCollector<R>.(value: R) -> Boolean): Flow<R> {
return transformWhile {
emit(it)
!transform(it)
}
}
and
suspend fun Flow<Int>.onProcessTest(
onSuccess: ((Int) -> Unit)
) {
takeUntil {
it == 5
}.collect { value ->
onSuccess(value)
}
}
fun main(): Unit = runBlocking {
val flows = flow {
emit(1)
emit(2)
emit(3)
emit(4)
emit(5)
emit(6)
emit(7)
}
flows.onProcessTest {
println(it)
}
}
result:
1
2
3
4
5
Process finished with exit code 0
Upvotes: 0
Reputation: 9672
transformWhile
is a more flexible/generalized takeWhile
.
flow
.transformWhile {
emit(it)
myCondition(it)
}
.collect {
println(it)
}
Upvotes: 10
Reputation: 10493
So, one way that I found looks something like:
var b = true
flow.takeWhile {i -> b.also { b = myCondition(i) } }
.collect {
println(it)
}
Taking it out as an extension,
fun <T> Flow<T>.takeUntil(predicate: suspend (T) -> Boolean): Flow<T> {
var b = true
return takeWhile { i ->
b.also { b = predicate(i) }
}
}
// Usage
flow.takeUntil { myCondition(it) }
.collect {
}
Is there a better way?
Upvotes: 0