Reputation: 2809
I am developing an Android app using RxJava. For example, I want to get some user data from the local database. But if the local database has no users, I should get the users using the REST API.
class Presenter {
val mDisposable = CompositeDisposable()
override fun getUsersFromLocal() {
Log.d("TAG", "This is called just one")
mDisposable.add(localDatabase.userDao().getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { // This "subscribe" is the problem. Here is called multiple......
Log.d("TAG", "This subscribe is called multiple, It called more than 10 times")
if (it.isEmpty()) {
secondCall()
} else {
view.onUsersLoaded(it)
}
})
}
override fun getUsersFromRemote() {
mDisposable.add(restApi.getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
saveLocal(it) // If I remove just this line, subscribe is called once. (works fine)
view.onUsersLoaded(it)
})
}
}
I am using the nested(?) RxJava. I don't know why the firstCall's subscribe method is called multiple... If I remove "secondCall()" method and logic, "subscribe" just calls once.
I found one clue. The "saveLocal" method is running in background thread.
private fun saveLocal(users: List<User>) {
users.forEach {
appExecutors.diskIo.execute {
localDatabase.userDao().saveUser(it)
}
}
}
I changed upper origin code to below.
private fun saveLocal(users: List<User>) {
appExecutors.diskIo.execute {
localDatabase.userDao().saveUsers(users)
}
}
After changed, the "subscribe" is called just twice. Yes, it still has the problem. But before that, the "subscribe" was called multiple times. (I think it was called as much as the "users" size.)
I added "UserDao class" and "RestApi class" code too.
import android.arch.persistence.room.*
import io.reactivex.Flowable
import io.reactivex.Single
@Dao
interface UserDao {
@Query("SELECT * FROM user ORDER BY id DESC")
fun getUsers(): Flowable<List<User>>
}
And here is the "RestApiService class".
import io.reactivex.Observable
interface RestApiService {
@Headers(API_HEADER_AUTHORIZATION, API_HEADER_ACCEPT)
@GET("/users")
fun getUsers(): Observable<UserList> // UserList class has a list of the user
}
Upvotes: 4
Views: 2694
Reputation: 286
You need to use 'Maybe' instead of 'Flowable' for your case.
Using Flowable, if you again add new data or update your previous inserted object, Flowable object will emit automatically and will call subscribe method again:
This change might fix your code:
@Dao
interface UserDao {
@Query("SELECT * FROM user ORDER BY id DESC")
fun getUsers(): Maybe<List<User>>
}
For more information about Flowable, Single, and Maybe look at the article below" Room implementation with RxJava
Upvotes: 2
Reputation: 567
You should use switchIfEmpty operator and not use an if statement in the subscription. Basically you make two subscriptions
Upvotes: -1