Reputation: 763
My viewmodel calls the repository methods to fetch some data from room database and also from network.
class Repository @Inject constructor(
private val remoteDatasource: IRemoteSource,
private val localDatasource: ILocalSource,
private val subscriberScheduler: Scheduler,
private val observerScheduler: Scheduler
) : IRepository {
//this method fetches data from room
override fun getData(): Flowable<Boolean> {
return localDatasource.shouldFetchRemote().subscribeOn(subscriberScheduler)
.observeOn(observerScheduler)
}
// makes api call
override fun getRemoteData(): Flowable<Data> {
return remoteDatasource.getData().subscribeOn(subscriberScheduler)
.observeOn(observerScheduler)
}
subscriberScheduler is Schedulers.io() and observer scheduler is AndroidSchedulers.mainThread(). I get exception when I do query from room, saying that the opertion is in main thread. Also when I get data from remote source, I check the thread, it is main thread, but this no exception like network call on main thread.
Here is my localsource class which uses room:
class Localsource constructor(private val dataDao: DataDao):ILocalSource {
override fun shouldFetchRemote(): Flowable<Boolean> {
if (Looper.getMainLooper().thread == Thread.currentThread()) {
Log.v("thread","main thread")
//this log prints
}
//exception thrown here
return Flowable.just(dataDao.isDataPresent() != 0)
}
Here is class for RemoteSource
@OpenForTesting
class Remotesource @Inject constructor():IRemoteSource{
override fun getData(): Flowable<Data> {
if (Looper.getMainLooper().getThread() == Thread.currentThread()) {
Log.v("thread","main thread")
//this log prints but no exception is thrown like network call on main thread.
}
return service.getData().flatMap { Flowable.just(it.data) }
}
}
Upvotes: 0
Views: 193
Reputation: 16699
Your assumptions about what happens where are wrong. That is an issue.
Lets look at shouldFetchRemote()
method.
//This part will always be on the main thread because it is run on it.
//Schedulers applied only for the created reactive
//stream(Flowable, Observable, Single etc.) but not for the rest of the code in the method.
if (Looper.getMainLooper().thread == Thread.currentThread()) {
Log.v("thread","main thread")
//this log prints
}
//exception thrown here
//Yes it is correct that exception is thrown in this line
//because you do reach for the database on the main thread here.
//It is because Flowable.just() creates stream out of the independent data
//that does not know anything about scheduler here.
// dataDao.isDataPresent() - is run on the main thread
//because it is not yet part of the reactive stream - only its result is!!!!
//That is crucial
return Flowable.just(dataDao.isDataPresent() != 0)
In order to include the function into a stream you need to take another approach. Room has an ability to return Flowables directly and store booleans. This way you can use it like this
In DAO
@Query(...)
Boolean isDataPresent(): Flowable<Boolean>
In your local source
override fun shouldFetchRemote(): Flowable<Boolean> = dataDao.isDataPresent()
This way it will work as expected because now the whole function is the part of reactive stream and will react to schedulers.
The same with remote sourse. Retrofit
can return Observables or Flowables out of the box
interface Service{
@GET("data")
fun getData(): Flowable<Data>
}
// and the repo will be
val service = retrofit.create(Service::class.java)
override fun getData(): Flowable<Data> = service.getData()
This way everything will work as expected because now it is the part of a stream.
If you want to use plan data from Room or Retrofit - you can do it either. The only thing is Flowable.just() won't work.
For example for your local source you will need to do something like
//DAO
@Query(...)
Boolean isDataPresent(): Boolean
override fun shouldFetchRemote(): Flowable<Boolean> = Flowable.create<Boolean>(
{ emitter ->
emitter.onNext(dataDao.isDataPresent())
emitter.onComplete() //This is crucial because without onComplete the emitter won't emit anything
//There is also emitter.onError(throwable: Throwable) to handle errors
}, BackpressureStrategy.LATEST).toObservable() // there are different Backpressure Strategies
There are similar factories for Obserwable and other reactive stream.
And generally I would recommend you to read the documentation.
Upvotes: 2