Reputation: 11559
I implemented my own customer Operator called DoCountOperator
, which supports custom Observable
operators I created like doOnNextCount()
, doOnErrorCount()
, and doOnCompletedCount()
count. It allows creating a side-effect for these events and do something with the emission count at those events. For example, doOnCompletedCount()
allows you to create a side effect with the full emission count after the onCompleted()
event.
I am running into a MissingBackpressureException
problem for a large number of emissions though, and my stack trace gives mention to my operator. Here is the Operator
...
final class DoCountOperator<T> implements Observable.Operator<T,T> {
interface CountObserver {
void onNext(int emissionCount);
void onError(int emissionCount);
void onCompleted(int emissionCount);
}
private final DoCountOperator.CountObserver doObserver;
DoCountOperator(DoCountOperator.CountObserver doObserver) {
this.doObserver = doObserver;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
return new Subscriber<T>() {
private int count = 0;
private boolean done = false;
@Override
public void onCompleted() {
if (done) {
return;
}
try {
doObserver.onCompleted(count);
}catch (Throwable throwable) {
//Exceptions.throwIfFatal(throwable);
onError(throwable);
}
done = true;
child.onCompleted();
}
@Override
public void onError(Throwable throwable) {
if (done) {
return;
}
try {
doObserver.onError(count);
} catch (Throwable throwable1) {
throwable1.printStackTrace();
}
child.onError(throwable);
}
@Override
public void onNext(T t) {
if (done) {
return;
}
try {
doObserver.onNext(++count);
}catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
}
child.onNext(t);
}
};
}
}
And here is how doOnCompletedCount()
uses it. I'm getting an error when I emit something like 50,000 to 90,000 emitted items.
public static <T> Observable.Operator<T,T> doOnCompletedCount(final IntConsumer countAction) {
return new DoCountOperator<>(new DoCountOperator.CountObserver() {
@Override
public void onNext(int emissionCount) {
}
@Override
public void onError(int emissionCount) {
}
@Override
public void onCompleted(int emissionCount) {
countAction.accept(emissionCount);
}
});
}
I cannot think of how this operator might be overwhelmed by backpressure, but perhaps it is failing to communicate a backpressure problem across the chain? What exactly am I doing wrong? Here is the stack trace...
rx.exceptions.MissingBackpressureException
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:352)
at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:346)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:329)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:117)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at com.swa.rm.common.rx.DoCountOperator$1.onNext(DoCountOperator.java:66)
at rx.internal.operators.OperatorFinally$1.onNext(OperatorFinally.java:48)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
at com.swa.rm.common.rx.RxOperators$1.onNext(RxOperators.java:49)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OperatorFinally$1.onNext(OperatorFinally.java:48)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
at rx.internal.operators.CachedObservable$CacheState.onNext(CachedObservable.java:191)
at rx.internal.operators.CachedObservable$CacheState$1.onNext(CachedObservable.java:171)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OperatorTake$1.onNext(OperatorTake.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:232)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
at rx.internal.operators.CachedObservable$ReplayProducer.request(CachedObservable.java:304)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:244)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:232)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
at rx.internal.operators.CachedObservable$CacheState.onNext(CachedObservable.java:191)
at rx.internal.operators.CachedObservable$CacheState$1.onNext(CachedObservable.java:171)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)
at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
at rx.internal.operators.OperatorToObservableSortedList$2.onCompleted(OperatorToObservableSortedList.java:82)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2$2.onCompleted(OperatorGroupBy.java:264)
at rx.observers.Subscribers$5.onCompleted(Subscribers.java:220)
at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:155)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:339)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onCompleted(OperatorGroupBy.java:161)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:614)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:255)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
at rx.internal.operators.CachedObservable$CacheState.onCompleted(CachedObservable.java:211)
at rx.internal.operators.CachedObservable$CacheState$1.onCompleted(CachedObservable.java:179)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:614)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:255)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:614)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:255)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
at rx.internal.operators.CachedObservable$CacheState.onCompleted(CachedObservable.java:211)
at rx.internal.operators.CachedObservable$CacheState$1.onCompleted(CachedObservable.java:179)
at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:183)
at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:65)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:167)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onCompleted(OperatorConcat.java:150)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorBufferWithSize$1.onCompleted(OperatorBufferWithSize.java:126)
at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:183)
at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:65)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:167)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onCompleted(OperatorConcat.java:150)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:183)
at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:65)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:167)
at rx.internal.operators.OperatorConcat$ConcatSubscriber.onCompleted(OperatorConcat.java:150)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:101)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.CachedObservable$CacheState.connect(CachedObservable.java:183)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:248)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.CachedObservable$CacheState.connect(CachedObservable.java:183)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:248)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.CachedObservable$CacheState.connect(CachedObservable.java:183)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:248)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.CachedObservable$CacheState.connect(CachedObservable.java:183)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:248)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:98)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Upvotes: 2
Views: 293
Reputation: 69997
You forgot to link the downstream Subscriber's unsubscription and backpressure "channels" in your code. You need something like this:
final class DoCountOperator<T> implements Observable.Operator<T,T> {
interface CountObserver {
void onNext(int emissionCount);
void onError(int emissionCount);
void onCompleted(int emissionCount);
}
private final DoCountOperator.CountObserver doObserver;
DoCountOperator(DoCountOperator.CountObserver doObserver) {
this.doObserver = doObserver;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
Subscriber<T> parent = new Subscriber<T>() {
private int count = 0;
private boolean done = false;
@Override
public void onCompleted() {
if (done) {
return;
}
try {
doObserver.onCompleted(count);
}catch (Throwable throwable) {
//Exceptions.throwIfFatal(throwable);
onError(throwable);
}
done = true;
child.onCompleted();
}
@Override
public void onError(Throwable throwable) {
if (done) {
return;
}
try {
doObserver.onError(count);
} catch (Throwable throwable1) {
throwable1.printStackTrace();
}
child.onError(throwable);
}
@Override
public void onNext(T t) {
if (done) {
return;
}
try {
doObserver.onNext(++count);
}catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
}
child.onNext(t);
}
@Override
public void setProducer(Producer p) {
child.setProducer(p);
}
};
child.add(parent);
return parent;
}
}
So instead of returning the custom Subscriber on call(), you put it into a local variable and call child.add(parent)
thus establishing the unsubscription chain. For the backpressure to work, the best is to override setProducer
in the custom Subscriber and simply call child.setProducer
with the received Producer
instance.
Upvotes: 3