Reputation: 2312
With RxJava Ive become accustomed to my repositories returning Observables of data which automatically update whenever theres an underlying change. I acheive this by simply having a subject in my repository that gets notified with the relevant change info, and observables like getAll()
go off of that.
As an example, take this psuedo code like snippet:
fun getAll(): Observable<List<Model> {
subject
.filter { isChangeRelevant(it) }
.startWith(initialChangeEvent)
.map { queryAll() }
}
Ive been curious about how and if the same thing can be acheived using coroutines only?
Upvotes: 2
Views: 1017
Reputation: 4060
You can use Kotlin Coroutines Channel
s.
If you only want your values to be emitted like a stream (so you can for-each
off of it) you can use produce
to create them (which returns a ReceiveChannel
):
fun test(): ReceiveChannel<Int>{
return produce {
send(1)
send(5)
send(100)
}
}
You can use a for-each (or consumeEach
) on the values of test()
to receive its values.
If you want your channel to be exactly like RxJava's PublishSubject, you can use ConflatedBroadCastChannel, and emit values to it:
val broadCastChannel = ConflatedBroadcastChannel<Int>()
You can use broadCastChannel.offer(value)
to send values to the channel.
To receive values from the channel you can use a simple for-each loop:
for (i in broadCastChannel.openSubscription()) {
//your values
}
Upvotes: 1