Reputation: 1258
I'm trying to gradually convert an android code base from RxJava2 to Kotlin coroutines. We are using UseCases
and Repositories
. I've converted one of the repository methods returning Observable
to be a suspend
function.
Now, there was a UseCase
using Observables.combineLatest
to combine 2 repository Observables
, one of which is the one I converted to be suspend
.
In order to still use that UseCase
function as is, I converted the suspend
function to be an observable
using kotlinx-coroutines-rx2
that provides interop between rxjava and coroutines. I'm using this method specifically.
This is how the code looks:
override fun execute(): Observable<GetFollowersResult> {
return Observables.combineLatest(
// This suspend function is not getting called
rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() },
profileRepository.getProfile().toObservable()
) { followers, profile ->
// mapping code
}.subscribeOn(threadExecutor)
.map<GetFollowersResult> { page ->
// result
}
.onErrorReturn { throwable ->
// error
}
.observeOn(postExecutionThread)
.startWith(GetFollowersResult.InFlight)
}
But even when the observable returned by combineLatest
is subscribed, the suspend function inside rxObservable
doesn't get called.
Am i missing something? I cannot convert the other method to be suspend
since that method is used in quite a many places and i still want to keep the suspend
function since we need to use it in newer UseCases
.
Upvotes: 1
Views: 1462
Reputation: 13321
I suggest you replace:
rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() }
with:
rxSingle(Dispatchers.IO) { profileRepository.getFollowers() }.toObservable()
as the Rx equivalent of a suspend
function is either Single
(when it returns a value) or Completable
(when it returns Unit
).
If you check the Javadoc for rxSingle
and rxObservable
you will see a subtle difference:
block
function in rxObservable
returns Unit
, meaning inside you need to use send
to emit items.block
function in rxSingle
returns T
, meaning it returns the value of the suspend function called inside.Upvotes: 4