Talha Mir
Talha Mir

Reputation: 1258

rxObservable from coroutines-rx2 lib doesn't work with Observables.combineLatest

I'm trying to gradually convert an android code base from RxJava2 to Kotlin coroutines. We are using UseCases and Repositories. I've converted one of the repository methods returning Observable to be a suspend function.

Now, there was a UseCase using Observables.combineLatest to combine 2 repository Observables, one of which is the one I converted to be suspend.

In order to still use that UseCase function as is, I converted the suspend function to be an observable using kotlinx-coroutines-rx2 that provides interop between rxjava and coroutines. I'm using this method specifically.

This is how the code looks:

override fun execute(): Observable<GetFollowersResult> {
    return Observables.combineLatest(
        // This suspend function is not getting called
        rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() },
        profileRepository.getProfile().toObservable()
    ) { followers, profile ->
        // mapping code
    }.subscribeOn(threadExecutor)
        .map<GetFollowersResult> { page ->
            // result
        }
        .onErrorReturn { throwable ->
            // error
        }
        .observeOn(postExecutionThread)
        .startWith(GetFollowersResult.InFlight)
}

But even when the observable returned by combineLatest is subscribed, the suspend function inside rxObservable doesn't get called.

Am i missing something? I cannot convert the other method to be suspend since that method is used in quite a many places and i still want to keep the suspend function since we need to use it in newer UseCases.

Upvotes: 1

Views: 1462

Answers (1)

LordRaydenMK
LordRaydenMK

Reputation: 13321

I suggest you replace:

rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() }

with:

rxSingle(Dispatchers.IO) { profileRepository.getFollowers() }.toObservable()

as the Rx equivalent of a suspend function is either Single (when it returns a value) or Completable (when it returns Unit).

If you check the Javadoc for rxSingle and rxObservable you will see a subtle difference:

  • The block function in rxObservable returns Unit, meaning inside you need to use send to emit items.
  • The block function in rxSingle returns T, meaning it returns the value of the suspend function called inside.

Upvotes: 4

Related Questions