Reputation: 6958
I have a List<Flow<T>>
, and would like to generate a Flow<List<T>>
. This is almost what combine
does - except that combine waits for each and every Flow
to emit an initial value, which is not what I want. Take this code for example:
val a = flow {
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
combine(flows) {
it.toList()
}.collect { println(it) }
}
With combine
(and hence as-is), this is the output:
[a2, b1, c]
[a2, b2, c]
Whereas I'm interested in all the intermediary steps too. This is what I want from those three flows:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Right now I have two work-arounds, but none of them are great... The first one is plain ugly and doesn't work with nullable types:
val flows = listOf(a, b, c).map {
flow {
emit(null)
it.collect { emit(it) }
}
}
runBlocking {
combine(flows) {
it.filterNotNull()
}.collect { println(it) }
}
By forcing all the flows to emit a first, irrelevant value, the combine
transformer is indeed called, and lets me remove the null values which I know are not actual values. Iterating on that, more readable but heavier:
sealed class FlowValueHolder {
object None : FlowValueHolder()
data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
flow {
emit(FlowValueHolder.None)
it.collect { emit(FlowValueHolder.Some(it)) }
}
}
runBlocking {
combine(flows) {
it.filterIsInstance(FlowValueHolder.Some::class.java)
.map { it.value }
}.collect { println(it) }
}
Now this one works just fine, but still feels like I'm overdoing stuff. Is there a method that I'm missing in the coroutines library?
Upvotes: 20
Views: 21164
Reputation: 8424
I think you might be looking for .merge()
:
fun <T> Iterable<Flow<T>>.merge(): Flow<T>
fun <T> merge(vararg flows: Flow<T>): Flow<T>
Merges the given flows into a single flow without preserving an order of elements. All flows are merged concurrently, without limit on the number of simultaneously collected flows.
The default .merge()
implementation works like this
public fun <T> Iterable<Flow<T>>.merge(): Flow<T> =
channelFlow {
forEach { flow ->
launch {
flow.collect { send(it) }
}
}
}
Upvotes: 2
Reputation: 29844
How about this:
inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
val array= Array(flows.size) {
false to (null as T?) // first element stands for "present"
}
flows.forEachIndexed { index, flow ->
launch {
flow.collect { emittedElement ->
array[index] = true to emittedElement
send(array.filter { it.first }.map { it.second })
}
}
}
}
It solves a few problems:
[]
is not in the resulting FlowSo, you won't notice any implementation specific workarounds, because you don't have to deal with it during collection:
runBlocking {
instantCombine(a, b, c).collect {
println(it)
}
}
Output:
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Edit: Updated answer to handle Flows which emit null values too.
* The used low-level array is thread-safe. It's as if you are dealing with single variables.
Upvotes: 17
Reputation: 6958
I would still like to avoid mapping to an intermediary wrapper type, and as someone mentioned in the comments, the behaviour is slightly wrong (this emits an empty list at first if no arguments emitted anything yet), but this is slightly nicer than the solutions I had in mind when I wrote the question (still really similar) and works with nullable types:
inline fun <reified T> instantCombine(
flows: Iterable<Flow<T>>
): Flow<List<T>> = combine(flows.map { flow ->
flow.map {
@Suppress("USELESS_CAST") // Required for onStart(null)
Holder(it) as Holder<T>?
}
.onStart { emit(null) }
}) {
it.filterNotNull()
.map { holder -> holder.value }
}
And here's a test suite that passes with this implementation:
class InstantCombineTest {
@Test
fun `when no flows are merged, nothing is emitted`() = runBlockingTest {
assertThat(instantCombine(emptyList<Flow<String>>()).toList())
.isEmpty()
}
@Test
fun `intermediate steps are emitted`() = runBlockingTest {
val a = flow {
delay(20)
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
assertThat(instantCombine(a, b, c).toList())
.containsExactly(
emptyList<String>(),
listOf("a0"),
listOf("a1"),
listOf("a1", "b0"),
listOf("a2", "b0"),
listOf("a2", "b1"),
listOf("a2", "b1", "c"),
listOf("a2", "b2", "c")
)
.inOrder()
}
@Test
fun `a single flow is mirrored`() = runBlockingTest {
val a = flow {
delay(20)
repeat(3) {
emit("a$it")
delay(100)
}
}
assertThat(instantCombine(a).toList())
.containsExactly(
emptyList<String>(),
listOf("a0"),
listOf("a1"),
listOf("a2")
)
.inOrder()
}
@Test
fun `null values are kept`() = runBlockingTest {
val a = flow {
emit("a")
emit(null)
emit("b")
}
assertThat(instantCombine(a).toList())
.containsExactly(
emptyList<String?>(),
listOf("a"),
listOf(null),
listOf("b")
)
.inOrder()
}
}
Upvotes: 4