Reputation: 2129
I have an upstream which emits data in chunks. This stream should be throttled using throttleFirst. Additionally, after all throttle timers are finished, the last value should be emitted. Unfortunately there is no throttleFierstBuffered operator in RxJava 2, therefore I have implemented an ObservableTransformer:
upstream -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2);
return Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged();
}
It works well except for the disposal. After the resulting Observable is disposed, I want to dispose the upstream as well. How can I do that?
I have tried to access the disposable using autoConnect(2, disposable -> {}), but there must be a better way. So far I got this and I do not like it:
Observable.<T>create(
emitter -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2, emitter::setDisposable);
Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged()
.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
Upvotes: 2
Views: 1450
Reputation: 2129
I am answering my own question here to my best knowledge, so please let me know if I am wrong.
Based on the comment from akarnokd, a solution will look like this:
Observable.<T>create(
emitter -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2, emitter::setDisposable);
Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged()
.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
Second argument of autoConnect is an Action, which represents an established connection of 2 connected observers.
This can be used with emitter::setDisposable to dispose of the autoConnect when observers dispose of the resulting Observable.
Upvotes: 3