Reputation: 2153
Many RxJava tutorials with RxTextView.textChanges
examples and debounce
, use 'live search'. For example: Improving UX with RxJava. So I've implemented this example and I tried to play around:
RxTextView.textChanges(searchView)
.observeOn(Schedulers.io())
.skip(1)
.debounce(DELAY_BEFORE_REQUEST_MS, TimeUnit.MILLISECONDS)
.map(new Func1<CharSequence, String>() {
@Override public String call(CharSequence charSequence) {
return charSequence.toString();
}
})
.switchMap(new Func1<String, Observable<Response>>() {
@Override public Observable<Response> call(String query) {
return retrofitService.search(query);
}
})
.subscribe();
Everything looked good, until I decided to simulate GPRS network type on Android emulator.
First api call was triggered, and when I added next letter to 'searchView', app crashed with InterruptedIOException
:
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:60)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:423)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
at java.lang.Thread.run(Thread.java:818)
Caused by: rx.exceptions.OnErrorNotImplementedException: thread interrupted
at rx.Observable$27.onError(Observable.java:7923)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorSubscribeOn$1$1$1.onError(OperatorSubscribeOn.java:71)
at rx.observers.SerializedObserver.onError(SerializedObserver.java:159)
at rx.observers.SerializedSubscriber.onError(SerializedSubscriber.java:79)
at rx.internal.operators.OperatorSwitch$SwitchSubscriber.error(OperatorSwitch.java:223)
at rx.internal.operators.OperatorSwitch$InnerSubscriber.onError(OperatorSwitch.java:282)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48)
at retrofit2.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:114)
at retrofit2.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:88)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorSwitch$SwitchSubscriber.onNext(OperatorSwitch.java:105)
at rx.internal.operators.OperatorSwitch$SwitchSubscriber.onNext(OperatorSwitch.java:60)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
at rx.internal.operators.OperatorDebounceWithTime$DebounceState.emit(OperatorDebounceWithTime.java:132)
at rx.internal.operators.OperatorDebounceWithTime$1$1.call(OperatorDebounceWithTime.java:79)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:423)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
at java.lang.Thread.run(Thread.java:818)
Caused by: java.io.InterruptedIOException: thread interrupted
at okio.Timeout.throwIfReached(Timeout.java:145)
at okio.Okio$2.read(Okio.java:136)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
at okio.RealBufferedSource.indexOf(RealBufferedSource.java:306)
at okio.RealBufferedSource.indexOf(RealBufferedSource.java:300)
at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:196)
at okhttp3.internal.http.Http1xStream.readResponse(Http1xStream.java:184)
at okhttp3.internal.http.Http1xStream.readResponseHeaders(Http1xStream.java:125)
at okhttp3.internal.http.HttpEngine.readNetworkResponse(HttpEngine.java:723)
at okhttp3.internal.http.HttpEngine.access$200(HttpEngine.java:81)
at okhttp3.internal.http.HttpEngine$NetworkInterceptorChain.proceed(HttpEngine.java:708)
at okhttp3.internal.http.HttpEngine.readResponse(HttpEngine.java:563)
at okhttp3.RealCall.ge
I have searched a little, and it looks like I'm not alone: first and second.
Author of that first question solved this problem this by wrapping retrofit request with try-catch block. For me it is attempt of covering bad architecture. And I'm looking for cleaner solution.
Is there a way of ignoring first API call result, and starting new one using RxJava? Or I should try to switch over new Retrofit Call API, and try to cancel previous request (and break a reactive approach)?
I have using Retrofit 2 beta 3, with newest Okio and OkHttp.
Upvotes: 2
Views: 2133
Reputation: 6803
Well, the error is pretty explicit, you should add onError
handling. It could look something like this:
.subscribe(new Observer<Response>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Response response) {
}
});
However, your subscription will be terminated once an error is emitted, but you can avoid this by handling the errors of the API call like this:
.switchMap(new Func1<String, Observable<Response>>() {
@Override public Observable<Response> call(String query) {
return retrofitService.search(query)
.onErrorResumeNext(Observable.<Response>empty());
}
})
Upvotes: 5