Reputation: 390
I am working with the retry
operator in RxSwift. According to its documentation, it "resubscribes" to the source observable when it encounters an error.
This is all well and good. However, I'm not exactly sure how to reason about the "source observable" when it contains hot observables, or a mixture of hot/cold observables.
An example I am sure about:
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let coldObservableRetry = coldObservable.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(2) // retry is inclusive of original attempt
coldObservableRetry.subscribe(onNext: { print ($0)} ) // prints 1, 2, 1, 2, before erroring out.
An example I am not sure about:
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let hotObservableRetry = hotObservable.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(2)
hotObservableRetry.subscribe(onNext: { print ($0)} ) // What happens here?
Another example I am not sure about:
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let delayedHotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).delay(.milliseconds(100), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservableRetry = Observable.combineLatest(coldObservable, delayedHotObservable).map { $0 + $1 }.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(5)
mixtureObservableRetry.subscribe(onNext: { print ($0)} ) // What happens here? What does it even mean to resubscribe to a combineLatest with a hot and a cold observable?
Yet another example I am not sure about:
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservable2Retry = coldObservable.flatMapLatest { _ in hotObservable }
.flatMapLatest { num in
if num % 3 == 0 { return .just(num) }
else { return .error() }
}
.retry(2)
mixtureObservable2Retry.subscribe(onNext: { print ($0)} ) // What happens here?
Upvotes: 1
Views: 237
Reputation: 33967
Many of your examples don't compile, so what happens is that you get an compile error. :-) But to answer the question asked...
There are some important things to keep in mind.
next
events and then a stop event (which is either completed
or error
.) It will give each subscription its own set of events.connect
is called. All the subscriptions will share the same set of events.Lastly in this case, debug
is your friend.
So for your first example you aren't sure about (adjusted so it will compile and run with some debug operators added):
func example() {
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.debug("before publish")
.publish()
let hotObservableRetry = hotObservable
.debug("after publish")
.map { (num) -> Int in
guard num % 3 != 0 else { throw MyError() }
return num
}
.debug("after map")
.retry(2)
.debug("after retry")
_ = hotObservableRetry.subscribe()
_ = hotObservable.connect()
}
Will produce the below output.
Here are some things to notice about the output that will help in the learning process.
The above are the key points to understand to answer this question.
after retry -> subscribed
after map -> subscribed
after publish -> subscribed
before publish -> subscribed
before publish -> Event next(0)
after publish -> Event next(0)
after map -> Event error(MyError())
after map -> isDisposed
after publish -> isDisposed
after map -> subscribed
after publish -> subscribed
before publish -> Event next(1)
after publish -> Event next(1)
after map -> Event next(1)
after retry -> Event next(1)
before publish -> Event next(2)
after publish -> Event next(2)
after map -> Event next(2)
after retry -> Event next(2)
before publish -> Event next(3)
after publish -> Event next(3)
after map -> Event error(MyError())
after retry -> Event error(MyError())
after retry -> isDisposed
after map -> isDisposed
after publish -> isDisposed
before publish -> Event next(4)
before publish -> Event next(5)
before publish -> Event next(6)
...
In the next example you showed, the combineLatest operator resubscribes to the hot observable, but the connectable observable does not resubscribe to its source.
Upvotes: 1