Tarun Dholakiya
Tarun Dholakiya

Reputation: 839

RxJava : How to maintain Observable alive even after getting error in onError() or ReSubscribe the same Observable

Actually I have created a RxSearch type configuration. In which I have attached an Edittext textChangeListener with the PublishSubject. Using the events to send the characters to the Observable which is being used as input for the retrofit API call.

Problem

Only issue I m facing is sometime I got the error from API "unexpected end of stream" inside onError() callback of observable. Once I got the error, Observable stops working. Observable shuts down, not able to get the characters from PublishSubject's onNext().

Look at RxSearchObservable

class RxSearchObservable {
companion object {
    fun fromView(editText: EditText): Observable<String> {
        val subject = PublishSubject.create<String>()
        editText.addTextChangedListener(object : TextWatcher {
            override fun afterTextChanged(s: Editable?) {
                //subject.onComplete()
            }

            override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
                //subject.onNext(s.toString())
            }

            override fun onTextChanged(s: CharSequence, start: Int, before: Int, count: Int) {
                if (s.isNotEmpty()) subject.onNext(s.toString())
            }
        })
        return subject
    }
}
}

How I subscribing and Making an Retrofit API call in side SwitchMap.

 RxSearchObservable.fromView(edtToolSearch)
                    .debounce(700, TimeUnit.MILLISECONDS)
                    .distinctUntilChanged()
                    .retryWhen { t -> t.delay(3, TimeUnit.SECONDS) }
                    .switchMap { searchTerm ->
                        runOnUiThread { progressBar.visibility = View.VISIBLE }
                        apiManager.getSearchUnits(searchTerm)
                    }
                    .onErrorResumeNext(Observable.empty())
                    .subscribe({ response ->
                        Log.i("Called subscribe", ":::::::::::+++++++++++++++ GONE")
                        progressBar.visibility = View.GONE
                        if (response.isSuccessful) {
                            val units = response.body()
                            val searchedDatasets = units?.dataset
                            if (searchedDatasets?.size!! > 0) {
                                val searchAdapter = SearchAdapter(this@MapActivity, searchedDatasets, false)
                                listSearch.visibility = View.VISIBLE
                                listSearch.adapter = searchAdapter
                            } else {
                                toast("No items found !!!")
                            }
                        } else {
                            apiError = ErrorUtils.parseError(response)
                            toast(apiError.msg)
                        }
                    }, { t: Throwable? ->
                        progressBar.visibility = View.GONE
                        toast(t?.message.toString())
                    }))

Any Idea, Help, Suggestion will be appreciated. Thanks in advance.

Upvotes: 0

Views: 1007

Answers (1)

tynn
tynn

Reputation: 39843

A stream which errors is terminated. You can retry() the subscription, but this should be done conditionally only. Maybe with timeout, maybe only a few times, maybe on certain errors only.

In your case you should consider handling the error of the API call within the switchMap. Like this the error doesn't reach the main stream.

.switchMap { searchTerm ->
    runOnUiThread { progressBar.visibility = View.VISIBLE }
    apiManager.getSearchUnits(searchTerm)
          .onErrorResumeNext(Observable.empty())
}

Upvotes: 0

Related Questions