Reputation: 3000
Using Rx
one can merge multiple subscription sources like the following
// psudo data repository
fun getAllData(): Flowable<DataType> {
return getCachedData().mergeWith(getRemoteData())
}
fun getCachedData(): Flowable<DataType> {
// local database call
}
fun getRemoteData(): Flowable<DataType> {
// network call
}
in the code above getAllData()
will return data as soon as one of the merged Flowables
returns and then send the other once it's ready.
Question is, How can I achieve the same result using Kotlin coroutine's produce
?
Upvotes: 4
Views: 2753
Reputation: 2756
You can create a combined channel with produce
in which you launch two coroutines that consume both input channels and resend it to the combined channel.
Here's a function that merges multiple receive channels of the same type into a single one.
/**
* Merges multiple [channels] into one channel.
* All elements of all channels are sent to the combined channel in the order they arrive on the input channels.
*/
fun <T> CoroutineScope.mergeChannels(vararg channels: ReceiveChannel<T>) : ReceiveChannel<T> {
return produce {
channels.forEach {
launch { it.consumeEach { send(it) }}
}
}
}
You can use it like this:
fun main() = runBlocking<Unit> {
val every100Ms = produce {
repeat(10) {
send("every 100: $it")
delay(100)
}
}
val every200Ms = produce {
repeat(10) {
send("every 200: $it")
delay(200)
}
}
val combined = mergeChannels(every100Ms, every200Ms)
combined.consumeEach { println(it) }
}
Upvotes: 7