Reputation: 30568
I have a PublishSubject
and a Subscriber
which I use to process a (possibly) infinite stream of preprocessed data. The problem is that some of the elements might contain some error. I'd like to ignore them and continue processing. How can I do so? I've tried something like this:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
My problem is that none of the error handling methods help me out here:
onErrorResumeNext()
— instructs an Observable to emit a sequence of items if it encounters an error
onErrorReturn( )
— instructs an Observable to emit a particular item when it encounters an error
onExceptionResumeNext( )
— instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
retry( )
— if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error
retryWhen( )
— if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source
I tried retry()
for example but it hangs my process after the error indefinitely.
I also tried onErrorResumeNext()
but it does not work as expected:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar")
currentSubject.onError(RuntimeException())
currentSubject.onNext("wom")
currentSubject.onComplete()
This only prints foo
and bar
.
Upvotes: 6
Views: 4487
Reputation: 70007
If you want to continue processing after an error, it means your error is a value just like your String
s and should go through onNext
. To ensure type safety in this case, you should use some form of wrapper that can either take a regular value or an error; for example, the io.reactivex.Notification<T>
is available in RxJava 2:
PublishSubject<Notification<String>> subject = PublishSubject.create();
subject.subscribe(System.out::println);
subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));
Upvotes: 12