Tyler Pfaff
Tyler Pfaff

Reputation: 5032

How can I synchronously subscribe to an observable so that I don't miss out on emissions from that observable?

I have an android app with an MVVM architecture.

The view layer ( a fragment ) subscribes to an observable that is exposed by the ViewModel in the onStart(). Right after I call subscribe() on that observable, I make a direct invocation on the ViewModel that kicks things off. With this direct invocation, two things happen. First, the observable that was subscribed to emits an event to represent that the app is in the loading state. Next, the ViewModel fetches some data, and then emits that data.

The problem is, I don't receive the first emission. If however, I move my call to subscribe farther up the lifecycle chain, such as in onCreate() (and leave my invocation in onStart()), I do receive the emission. Clearly, the call to subscribe() is asynchronous, how can I make sure that I can subscribe to an observable before I start emitting from it?

Here is the case where first emission is not received.

//The fragment
 override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        viewModel = ViewModelProviders.of(this).get(OverviewFragmentViewModel::class.java)
    }

override fun onStart() {
    super.onStart()
    allSubscriptions.add(viewModel.uiStateChanged
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({ uiState ->
            when (uiState) {
                is UiState.Loading -> showLoadingView()
                is UiState.ListReady -> showList(uiState)
                is UiState.Error -> showErrorView()
            }
        }, { error ->
            Log.e(TAG, error.message, error)
        })
    )
    viewModel.loadMovies()
}
}


//The ViewModel

class OverviewFragmentViewModel : ViewModel(){

    val uiStateChanged = PublishSubject.create<UiState>()
    val model = OverviewFragmentRepo()

    companion object {
        val TAG = OverviewFragmentViewModel::class.java.simpleName
    }

    override fun onCleared() {
        super.onCleared()
    }

    fun loadMovies(){
        //This is the emission that happens to fast for the fragment to receive it!
        uiStateChanged.onNext(UiState.Loading())
        model.getMovies()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({response ->
                uiStateChanged.onNext(UiState.ListReady(response.results))
            }, { error ->
                uiStateChanged.onNext(UiState.Error())
                Log.e(TAG, error.message, error)
            })
    }
}

Now, if I just move the subscription up, the emission gets received. However, I don't want to be hoping that things complete in time, I want to be sure of it, and that is why I'm wanting to be able to guarantee that I am already subscribed before making that direct invocation to loadMovies(). Here is the same thing, with the subscription moved up, and the emission received.

 //The fragment
     override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            viewModel = ViewModelProviders.of(this).get(OverviewFragmentViewModel::class.java)

allSubscriptions.add(viewModel.uiStateChanged
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ uiState ->
                when (uiState) {
                    is UiState.Loading -> showLoadingView()
                    is UiState.ListReady -> showList(uiState)
                    is UiState.Error -> showErrorView()
                }
            }, { error ->
                Log.e(TAG, error.message, error)
            })
        )}
        }

    override fun onStart() {
        super.onStart()
        viewModel.loadMovies()
    }


    //The ViewModel

    class OverviewFragmentViewModel : ViewModel(){

        val uiStateChanged = PublishSubject.create<UiState>()
        val model = OverviewFragmentRepo()

        companion object {
            val TAG = OverviewFragmentViewModel::class.java.simpleName
        }

        override fun onCleared() {
            super.onCleared()
        }

        fun loadMovies(){
            //This is the emission that happens to fast for the fragment to receive it!
            uiStateChanged.onNext(UiState.Loading())
            model.getMovies()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({response ->
                    uiStateChanged.onNext(UiState.ListReady(response.results))
                }, { error ->
                    uiStateChanged.onNext(UiState.Error())
                    Log.e(TAG, error.message, error)
                })
        }
    }

Upvotes: 1

Views: 330

Answers (1)

borichellow
borichellow

Reputation: 1001

From my perspective, instead of creating PublishSubject and calling loadMovies(), you can create observable in ViewModel, like:

val uiStateChanged = model.getMovies()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())    
    .compose(ResponseOrError.toResponseOrErrorObservable())
    .map { if (it.isData) UiState.ListReady(it.data().results) else UiState.Error() }
    .startWith(UiState.Loading())

then you subscribe to this Observable in Fragment and you can remove viewModel.loadMovies()

read more about ResponseOrError

Upvotes: 1

Related Questions