dev.farmer
dev.farmer

Reputation: 2809

Why RxJava's "subscribe" method called multiple times?

I am developing an Android app using RxJava. For example, I want to get some user data from the local database. But if the local database has no users, I should get the users using the REST API.

class Presenter {
    val mDisposable = CompositeDisposable()

    override fun getUsersFromLocal() {
        Log.d("TAG", "This is called just one")
        mDisposable.add(localDatabase.userDao().getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { // This "subscribe" is the problem. Here is called multiple......
                Log.d("TAG", "This subscribe is called multiple, It called more than 10 times")
                if (it.isEmpty()) {
                    secondCall()
                } else {
                    view.onUsersLoaded(it)
                }
            })
    }

    override fun getUsersFromRemote() {
        mDisposable.add(restApi.getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                saveLocal(it) // If I remove just this line, subscribe is called once. (works fine)
                view.onUsersLoaded(it)
            })
    }
}

I am using the nested(?) RxJava. I don't know why the firstCall's subscribe method is called multiple... If I remove "secondCall()" method and logic, "subscribe" just calls once.


I found one clue. The "saveLocal" method is running in background thread.

    private fun saveLocal(users: List<User>) {
        users.forEach {
            appExecutors.diskIo.execute {
                localDatabase.userDao().saveUser(it)
            }
        }
    }

I changed upper origin code to below.

    private fun saveLocal(users: List<User>) {
        appExecutors.diskIo.execute {
            localDatabase.userDao().saveUsers(users)
        }
    }

After changed, the "subscribe" is called just twice. Yes, it still has the problem. But before that, the "subscribe" was called multiple times. (I think it was called as much as the "users" size.)


I added "UserDao class" and "RestApi class" code too.

import android.arch.persistence.room.*
import io.reactivex.Flowable
import io.reactivex.Single

@Dao
interface UserDao {

    @Query("SELECT * FROM user ORDER BY id DESC")
    fun getUsers(): Flowable<List<User>>

}

And here is the "RestApiService class".

import io.reactivex.Observable

interface RestApiService {
    @Headers(API_HEADER_AUTHORIZATION, API_HEADER_ACCEPT)
    @GET("/users")
    fun getUsers(): Observable<UserList> // UserList class has a list of the user
}

Upvotes: 4

Views: 2694

Answers (2)

moji
moji

Reputation: 286

You need to use 'Maybe' instead of 'Flowable' for your case.

Using Flowable, if you again add new data or update your previous inserted object, Flowable object will emit automatically and will call subscribe method again:

This change might fix your code:

@Dao
interface UserDao {

    @Query("SELECT * FROM user ORDER BY id DESC")
    fun getUsers(): Maybe<List<User>>

}

For more information about Flowable, Single, and Maybe look at the article below" Room implementation with RxJava

Upvotes: 2

Vaios
Vaios

Reputation: 567

You should use switchIfEmpty operator and not use an if statement in the subscription. Basically you make two subscriptions

Upvotes: -1

Related Questions