Reputation: 3506
I want to create an observable that emits values from underlying hot observable (starting with -1
before that), only when new value is different than the previous one. Additionally I want latest value to be immediately emitted to new subscribers. I've come up with following code:
PublishSubject<Integer> hotObservable = PublishSubject.create();
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(0);
However this fails after first value (always -1
, regardless of what hotObservable
emits before subscribing to observable
) emitted to new subscriber with java.lang.IllegalStateException: more produced than requested
Interestingly, when I don't automatically connect, but manually subscribe:
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect();
observable.subscribe().unsubscribe();
following subscribers work properly, receiving last value and then updates.
I can't get replay(1).autoConnect(0)
to work, and I feel like I miss something - why would subscribing and unsubscribing work, while autoConnect(0)
wouldn't? What's the proper way to create such observable?
Here's test method that fails unless I use autoConnect(); observable.subscribe().unsubscribe()
:
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(); // With (0) it fails
observable.subscribe().unsubscribe(); // Needed if we don't auto connnect
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3); // I want this value to be received by new subscriber
TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertValues(3);
Upvotes: 0
Views: 272
Reputation: 69997
I don't get the More produced than requested
error with the code above on RxJava 1.1.3.
The reason the assertion fails is that replay
won't request anything from upstream until any of its Subscribers actually request. If the TestSubscriber
is the first to subscribe, it will trigger the startWith to emit -1 and then switch to the PublishSubject which doesn't retain any value thus you don't receive anything else.
I believe what you are looking for is BehaviorSubject
which keeps the very last value and starts with that for new subscribers:
BehaviorSubject<Integer> hotObservable = BehaviorSubject.create(-1);
Observable<Integer> observable = hotObservable.distinctUntilChanged();
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3);
TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertValue(3);
Upvotes: 1