ZakTaccardi
ZakTaccardi

Reputation: 12467

RxJava: How can I reset a long running hot observable chain?

For my app's search feature, I have a hot observable chain that does the following.

  1. Accepts user input string into an EditText (a TextChangedEvent) (on mainThread)
  2. Debounce 300ms (on computation thread)
  3. Show loading spinner (mainThread)
  4. Query the SQL db with that string (this query can take anywhere from 100ms to 2000ms) (on Schedulers.io())
  5. Display results to the user (mainThread)

Because step 3 is so variable in length, a race condition occurs where less recent search results are displayed over more recent results (sometimes). Let's say a user wants to type chicken, but because of weird typing speeds, the first part of the word is emitted before the whole term:

How can I handle this scenario?

Full observable code:

subscription = WidgetObservable.text(searchText)
                .debounce(300, TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                        //do this on main thread because it's a UI element (cannot access a View from a background thread)

                        //get a String representing the new text entered in the EditText
                .map(new Func1<OnTextChangeEvent, String>() {
                    @Override
                    public String call(OnTextChangeEvent onTextChangeEvent) {
                        return onTextChangeEvent.text().toString().trim();
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        presenter.handleInput(s);
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s != null && s.length() >= 1 && !s.equals("");
                    }
                }).doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Timber.d("searching for string: '%s'", s);
                    }
                })
                        //run SQL query and get a cursor for all the possible search results with the entered search term
                .flatMap(new Func1<String, Observable<SearchBookmarkableAdapterViewModel>>() {
                    @Override
                    public Observable<SearchBookmarkableAdapterViewModel> call(String s) {
                        return presenter.getAdapterViewModelRx(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                        //have the subscriber (the adapter) run on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                        //subscribe the adapter, which receives a stream containing a list of my search result objects and populates the view with them
                .subscribe(new Subscriber<SearchBookmarkableAdapterViewModel>() {
                    @Override
                    public void onCompleted() {
                        Timber.v("Completed loading results");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Timber.e(e, "Error loading results");
                        presenter.onNoResults();
                        //resubscribe so the observable keeps working.
                        subscribeSearchText();
                    }

                    @Override
                    public void onNext(SearchBookmarkableAdapterViewModel searchBookmarkableAdapterViewModel) {
                        Timber.v("Loading data with size: %d into adapter", searchBookmarkableAdapterViewModel.getSize());
                        adapter.loadDataIntoAdapter(searchBookmarkableAdapterViewModel);
                        final int resultCount = searchBookmarkableAdapterViewModel.getSize();
                        if (resultCount == 0)
                            presenter.onNoResults();
                        else
                            presenter.onResults();
                    }
                });

Upvotes: 3

Views: 1914

Answers (1)

Brandon
Brandon

Reputation: 39192

Use switchMap instead of flatMap. That will cause it to throw away* the previous query whenever you start a new query.

*How this works:

Whenever the outer source observable produces a new value, switchMap calls your selector to return a new inner observable (presenter.getAdapterViewModelRx(s) in this case). switchMap then unsubscribes from the previous inner observable it was listening to and subscribes to the new one.

Unsubscribing from the previous inner observable has two effects:

  1. Any notification (value, completion, error, etc) produced by the observable will be silently ignored and thrown away.

  2. The observable will be notified that its observer has unsubscribed and can optionally take steps to cancel whatever asynchronous process it represents.

Whether your abandoned queries are actually cancelled or not is entirely dependent upon the implementation of presenter.getAdapterViewModelRx(). Ideally they would be canceled to avoid needlessly wasting server resources. But even if they keep running, #1 above prevents your typeahead code from seeing stale results.

Upvotes: 3

Related Questions