Andy
Andy

Reputation: 3696

Merging kotlin flows

Given 2 or more flows with the same type, is there an existing Kotlin coroutine function to merge them, like the RX merge operator?

Currently I was considering this:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = channelFlow {
    val flowJobs = flows.map { flow ->
        GlobalScope.launch { flow.collect { send(it) } }
    }
    flowJobs.joinAll()
}

but it seems somewhat clumsy.

Upvotes: 24

Views: 38015

Answers (3)

marstran
marstran

Reputation: 28056

I'm not too familiar with flows yet, so this might be suboptimal. Anyway, I think you could create a flow of all your input flows, and then use flattenMerge to flatten them into a single flow again. Something like this:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = flowOf(*flows).flattenMerge()

Edit:

The merge-function was added to kotlinx-coroutines in the 1.3.3 release. See here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html

Upvotes: 23

Jacques.S
Jacques.S

Reputation: 3623

This is now (Coroutines Version 1.3.5 at time of writing) part of the Coroutines library.

You use it like this:

val flowA = flow { emit(1) } 
val flowB = flow { emit(2) }

merge(flowA, flowB).collect{ println(it) } // Prints two integers
// or:
listOf(flowA, flowB).merge().collect { println(it) } // Prints two integers

You can read more in the source code

Upvotes: 32

Lamberto Basti
Lamberto Basti

Reputation: 476

It may be late but I believe this may be a viable solution:

fun <T> combineMerge(vararg flows: Flow<T>) = flow {
    coroutineScope {
        flows.forEach {
            launch {
                it.collect {
                    emit(it)
                }
            }
        }
    }
}

fun <T> combineConcat(vararg flows: Flow<T>) = flow {
    flows.forEach {
        it.collect {
            emit(it)
        }
    }
}

Upvotes: 0

Related Questions