Arpit Shukla
Arpit Shukla

Reputation: 10493

How to collect items from a Flow until a particular condition is met?

I have a Flow<List<Int?>> and I want to collect this flow but only until I get a null Int. Then the flow should get cancelled. For example,

val flow = flowOf(
    listOf(1, 2),
    listOf(3, null),
    listOf(4)
)
flow.collectUntilNull {
    println(it)
}

The output that I want is:

[1, 2]
[3, null]

I know there is a function Flow.takeWhile but it doesn't emit the value where predicate returns false. In my case I want that one also.

public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
    return@flow collectWhile { value ->
        if (predicate(value)) {
            emit(value)
            true
        } else {
            // I want a "emit(value)" here
            false
        }
    }
}

Since collectWhile is internal I can't use this code. Although I guess I can copy paste that collectWhile implementation in my code. But is there another way to do this?

Upvotes: 4

Views: 8014

Answers (3)

Keven
Keven

Reputation: 1

I solved this way:

 fun <R> Flow<R>.takeUntil(
    transform: suspend FlowCollector<R>.(value: R) -> Boolean): Flow<R> {
    return transformWhile {
        emit(it)
        !transform(it)
    }
}

and

suspend fun Flow<Int>.onProcessTest(
    onSuccess: ((Int) -> Unit)
) {
    takeUntil {
        it == 5
    }.collect { value ->
        onSuccess(value)
    }
}

fun main(): Unit = runBlocking {
    val flows = flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
        emit(6)
        emit(7)
    }


    flows.onProcessTest {
        println(it)
    }
}

result:

1
2
3
4
5

Process finished with exit code 0

Upvotes: 0

IR42
IR42

Reputation: 9672

transformWhile is a more flexible/generalized takeWhile.

flow
    .transformWhile {
        emit(it)
        myCondition(it)
    }
    .collect {
        println(it)
    }

Upvotes: 10

Arpit Shukla
Arpit Shukla

Reputation: 10493

So, one way that I found looks something like:

var b = true
flow.takeWhile {i -> b.also { b = myCondition(i) } }
    .collect {
        println(it)
    }

Taking it out as an extension,

fun <T> Flow<T>.takeUntil(predicate: suspend (T) -> Boolean): Flow<T> {
    var b = true
    return takeWhile { i ->
        b.also { b = predicate(i) }
    }
}

// Usage
flow.takeUntil { myCondition(it) }
    .collect {
 
    }

Is there a better way?

Upvotes: 0

Related Questions