Dale
Dale

Reputation: 3323

RxSwift combineLatest emits all replayed elements from last Observable

I'm seeing some unexpected results when using combineLatest on a collection of cold observables. It emits the latest from all but the last Observable and instead combines the latest from the first (n-1) Observables with each element from the nth Observable.

let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")

let latestObserver = Observable.combineLatest(observable, observable2)

_ = latestObserver
    .subscribe(onNext: {
    print($0)
})
.disposed(by: disposeBag)

Produces the output: (4, "bed") (4, "book") (4, "table")

I had expected to see output of just (4, "table").

If I change the order of the observables like so:

let latestObserver = Observable.combineLatest(observable2, observable)

The I get the output: ("table", 1) ("table", 2) ("table", 3) ("table", 4)

If I add a final arbitrary Observable then I see just the latest from each of the first ones:

let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()
let observable3 = Observable<Int>.just(42)

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")

let latestObserver = Observable.combineLatest(observable, observable2, observable3)

_ = latestObserver
    .subscribe(onNext: {
    print($0)
})
.disposed(by: disposeBag)

produces the output: (4, "table", 42)

Is this really the expected behaviour?

Upvotes: 2

Views: 1314

Answers (1)

Daniel T.
Daniel T.

Reputation: 33967

Let's break down what is happening in your first example...

You could use Observable.from instead of the subject and get the same results... Steps in the code are as follows,

  1. create two ReplaySubjects and load them up with events.
  2. The combineLatest operator subscribes to the first subject.
  3. That first subject immediately replays all of its values.
  4. Since the second subject hasn't been subscribed to yet, the combineLatest operator doesn't emit anything, instead it silently absorbs the values while always storing the "latest" one.
  5. The combineLatest operator then subscribes to the second subject.
  6. That subject the replays all of its values.
  7. Since the combineLatest operator has now received a next event from each of its sources, it emits the values emitted from the second source, combined with the latest (i.e., last) from the first source.

Your replay subjects are inherently synchronous. They emit all the values to the subscriber immediately upon subscription.

In your last example code, since the last observable of the three emits only one value and only after the other two have emitted all of their values, you only see one output containing the latest from the previous two observables.

Upvotes: 1

Related Questions