ozmank
ozmank

Reputation: 763

subscribing on scheduler.io() is not working

My viewmodel calls the repository methods to fetch some data from room database and also from network.

     class Repository @Inject constructor(
        private val remoteDatasource: IRemoteSource,
        private val localDatasource: ILocalSource,
        private val subscriberScheduler: Scheduler,
        private val observerScheduler: Scheduler
    ) : IRepository {
    //this method fetches data from room
        override fun getData(): Flowable<Boolean> {
            return localDatasource.shouldFetchRemote().subscribeOn(subscriberScheduler)
           .observeOn(observerScheduler)
        }
// makes api call
override fun getRemoteData(): Flowable<Data> {
            return remoteDatasource.getData().subscribeOn(subscriberScheduler)
           .observeOn(observerScheduler)
        }

subscriberScheduler is Schedulers.io() and observer scheduler is AndroidSchedulers.mainThread(). I get exception when I do query from room, saying that the opertion is in main thread. Also when I get data from remote source, I check the thread, it is main thread, but this no exception like network call on main thread.

Here is my localsource class which uses room:

class Localsource  constructor(private val dataDao: DataDao):ILocalSource {

    override fun shouldFetchRemote(): Flowable<Boolean> {
        if (Looper.getMainLooper().thread == Thread.currentThread()) {
            Log.v("thread","main thread")
            //this log prints
        }
       //exception thrown here
       return Flowable.just(dataDao.isDataPresent() != 0)
}

Here is class for RemoteSource

@OpenForTesting
class Remotesource @Inject constructor():IRemoteSource{



    override fun getData(): Flowable<Data> {
        if (Looper.getMainLooper().getThread() == Thread.currentThread()) {
            Log.v("thread","main thread") 
    //this log prints but no exception is thrown like network call on main thread.

        }
        return service.getData().flatMap { Flowable.just(it.data) }
    }
    }

Upvotes: 0

Views: 193

Answers (1)

Pavlo Ostasha
Pavlo Ostasha

Reputation: 16699

Your assumptions about what happens where are wrong. That is an issue.

Lets look at shouldFetchRemote() method.

//This part will always be on the main thread because it is run on it.
//Schedulers applied only for the created reactive 
//stream(Flowable, Observable, Single etc.) but not for the rest of the code in the method.
        if (Looper.getMainLooper().thread == Thread.currentThread()) {
            Log.v("thread","main thread")
            //this log prints
        }
       //exception thrown here
//Yes it is correct that exception is thrown in this line 
//because you do reach for the database on the main thread here.

//It is because Flowable.just() creates stream out of the independent data 
//that does not know anything about scheduler here.

// dataDao.isDataPresent() - is run on the main thread 
//because it is not yet part of the reactive stream - only its result is!!!! 
//That is crucial
       return Flowable.just(dataDao.isDataPresent() != 0)

In order to include the function into a stream you need to take another approach. Room has an ability to return Flowables directly and store booleans. This way you can use it like this

In DAO

@Query(...)
Boolean isDataPresent(): Flowable<Boolean>

In your local source

 override fun shouldFetchRemote(): Flowable<Boolean> = dataDao.isDataPresent()

This way it will work as expected because now the whole function is the part of reactive stream and will react to schedulers.

The same with remote sourse. Retrofit can return Observables or Flowables out of the box

interface Service{

@GET("data")
fun getData(): Flowable<Data>

}

// and the repo will be

val service = retrofit.create(Service::class.java)

override fun getData(): Flowable<Data> = service.getData()

This way everything will work as expected because now it is the part of a stream.

If you want to use plan data from Room or Retrofit - you can do it either. The only thing is Flowable.just() won't work.

For example for your local source you will need to do something like

//DAO
@Query(...)
Boolean isDataPresent(): Boolean


 override fun shouldFetchRemote(): Flowable<Boolean> = Flowable.create<Boolean>(
                    { emitter ->
                      emitter.onNext(dataDao.isDataPresent())
                      emitter.onComplete() //This is crucial because without onComplete the emitter won't emit anything
//There is also emitter.onError(throwable: Throwable) to handle errors

 }, BackpressureStrategy.LATEST).toObservable() // there are different Backpressure Strategies

There are similar factories for Obserwable and other reactive stream.

And generally I would recommend you to read the documentation.

Upvotes: 2

Related Questions