Ahmed Kamal
Ahmed Kamal

Reputation: 3000

How to merge producers in kotlin coroutines?

Using Rx one can merge multiple subscription sources like the following

// psudo data repository

fun getAllData(): Flowable<DataType> {
    return getCachedData().mergeWith(getRemoteData())
}

fun getCachedData(): Flowable<DataType> {
    // local database call
}

fun getRemoteData(): Flowable<DataType> {
    // network call
}

in the code above getAllData() will return data as soon as one of the merged Flowables returns and then send the other once it's ready.

Question is, How can I achieve the same result using Kotlin coroutine's produce?

Upvotes: 4

Views: 2753

Answers (1)

Florian Gutmann
Florian Gutmann

Reputation: 2756

You can create a combined channel with produce in which you launch two coroutines that consume both input channels and resend it to the combined channel.

Here's a function that merges multiple receive channels of the same type into a single one.

/**
 * Merges multiple [channels] into one channel.
 * All elements of all channels are sent to the combined channel in the order they arrive on the input channels.
 */
fun <T> CoroutineScope.mergeChannels(vararg channels: ReceiveChannel<T>) : ReceiveChannel<T> {
    return produce {
        channels.forEach {
            launch { it.consumeEach { send(it) }}
        }
    }
}

You can use it like this:

fun main() = runBlocking<Unit> {
    val every100Ms = produce {
        repeat(10) {
            send("every 100: $it")
            delay(100)
        }
    }

    val every200Ms = produce {
        repeat(10) {
            send("every 200: $it")
            delay(200)
        }
    }

    val combined = mergeChannels(every100Ms, every200Ms)
    combined.consumeEach { println(it) }
}

Upvotes: 7

Related Questions