TaQuangTu
TaQuangTu

Reputation: 2343

RxJava - How to stop a running task when new item emitted

I am building searching feature for my Android app with RxJava. When users type/change query characters in an EditText, an emitter will emit new query text and I search around my database to get results that match the query.

getEditTextObservable()
                .subscribeOn(Schedulers.computation())
                .debounce(500, TimeUnit.MILLISECONDS)
                .filter {
                    !it.isNullOrEmpty()
                }
                .map {
                    //start searching
                    getResultsInDatabase(it) //this function takes a long time to complete
                }.observeOn(AndroidSchedulers.mainThread())
                .subscribe{
                    //render results on screen
                }

The method getResultsInDatabase(string:String) take a long time to complete, when the query text changed, I want to stop method getResultsInDatabase(string:String) (in case it is running with previously emitted query) to run it again with new query. How can I do to achieve that? I would appreciate your helps. Thank you for reading my question.

Upvotes: 1

Views: 538

Answers (2)

PPartisan
PPartisan

Reputation: 8231

You need a switchMap.

getEditTextObservable()
    .subscribeOn(Schedulers.computation())
    .debounce(500, TimeUnit.MILLISECONDS)
    .filter { !it.isNullOrEmpty() } //Be careful with this. Should clearing text clear results too?
    .switchMap { 
        Observable.fromCallable { getResultsInDatabase(it) }.subscribeOn(Schedulers.io()) 
    }.observeOn(AndroidSchedulers.mainThread())
    .subscribe{
        //render results on screen
    }

Upvotes: 2

Manoj Perumarath
Manoj Perumarath

Reputation: 10214

You can use valve to pause a stream in RxJava. Based on the value of it, the stream continues to emit or will be kept in a paused state.

Upvotes: 1

Related Questions