Winston Du
Winston Du

Reputation: 390

How do RxSwift operators affect resubscribes?

I am working with the retry operator in RxSwift. According to its documentation, it "resubscribes" to the source observable when it encounters an error.

This is all well and good. However, I'm not exactly sure how to reason about the "source observable" when it contains hot observables, or a mixture of hot/cold observables.

An example I am sure about:

let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let coldObservableRetry = coldObservable.flatMapLatest { num in 
      if num % 3 == 0 { return .just(num) } 
      else { return .error() }
    }
    .retry(2) // retry is inclusive of original attempt
coldObservableRetry.subscribe(onNext: { print ($0)} ) // prints 1, 2, 1, 2, before erroring out.

An example I am not sure about:

let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let hotObservableRetry = hotObservable.flatMapLatest { num in 
      if num % 3 == 0 { return .just(num) } 
      else { return .error() }
    }
    .retry(2)
hotObservableRetry.subscribe(onNext: { print ($0)} ) // What happens here?

Another example I am not sure about:

let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let delayedHotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).delay(.milliseconds(100), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservableRetry = Observable.combineLatest(coldObservable, delayedHotObservable).map { $0 + $1 }.flatMapLatest { num in 
      if num % 3 == 0 { return .just(num) } 
      else { return .error() }
    }
    .retry(5)
mixtureObservableRetry.subscribe(onNext: { print ($0)} ) // What happens here? What does it even mean to resubscribe to a combineLatest with a hot and a cold observable?

Yet another example I am not sure about:

let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish().connect()
let mixtureObservable2Retry = coldObservable.flatMapLatest { _ in hotObservable }
.flatMapLatest { num in 
      if num % 3 == 0 { return .just(num) } 
      else { return .error() }
    }
    .retry(2)
mixtureObservable2Retry.subscribe(onNext: { print ($0)} ) // What happens here?

Upvotes: 1

Views: 237

Answers (1)

Daniel T.
Daniel T.

Reputation: 33967

Many of your examples don't compile, so what happens is that you get an compile error. :-) But to answer the question asked...

There are some important things to keep in mind.

  1. Each operator subscribes to its source observable(s) and generates a new observable.
  2. When a cold observable is subscribed to, it will emit 0...N next events and then a stop event (which is either completed or error.) It will give each subscription its own set of events.
  3. Hot observables don't start emitting until their connect is called. All the subscriptions will share the same set of events.

Lastly in this case, debug is your friend.

So for your first example you aren't sure about (adjusted so it will compile and run with some debug operators added):

func example() {
    let hotObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
        .debug("before publish")
        .publish()
    let hotObservableRetry = hotObservable
        .debug("after publish")
        .map { (num) -> Int in
            guard num % 3 != 0 else { throw MyError() }
            return num
        }
        .debug("after map")
        .retry(2)
        .debug("after retry")
    _ = hotObservableRetry.subscribe()
    _ = hotObservable.connect()
}

Will produce the below output.

Here are some things to notice about the output that will help in the learning process.

  • The subscribes happen in reverse order.
  • Once the first error is emitted, the retry operator re-subscribes to the map operator's observable which resubscribes to the publish's observable. Since the publish's observable is hot, the resubscribe chain stops there. The timer's observable does not get resubscribed.

The above are the key points to understand to answer this question.

after retry -> subscribed
after map -> subscribed
after publish -> subscribed
before publish -> subscribed
before publish -> Event next(0)
after publish -> Event next(0)
after map -> Event error(MyError())
after map -> isDisposed
after publish -> isDisposed
after map -> subscribed
after publish -> subscribed
before publish -> Event next(1)
after publish -> Event next(1)
after map -> Event next(1)
after retry -> Event next(1)
before publish -> Event next(2)
after publish -> Event next(2)
after map -> Event next(2)
after retry -> Event next(2)
before publish -> Event next(3)
after publish -> Event next(3)
after map -> Event error(MyError())
after retry -> Event error(MyError())
after retry -> isDisposed
after map -> isDisposed
after publish -> isDisposed
before publish -> Event next(4)
before publish -> Event next(5)
before publish -> Event next(6)
...

In the next example you showed, the combineLatest operator resubscribes to the hot observable, but the connectable observable does not resubscribe to its source.

Upvotes: 1

Related Questions