Reputation: 2408
When you have a large Observable graph (i.e. observable composed many times using merge
, groupBy
, join
etc.), and an exception is thrown, it is sometimes difficult to figure out where the exception originated from. I would like to know if it is possible to find out where in the source file Observable operators were evoked. An example should make this clearer.
For example, given the following IllegalStateException: Only one subscriber allowed!
and stack trace, I would like to know whether it is possible to find out what line number operatorMerge
,operatorFilter
,operatorGroupBy
etc. were evoked from in my source files. Is it possible to do this somehow, either by use of a debugger, print statements or otherwise?
java.lang.IllegalStateException: Only one subscriber allowed!
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS
ubscriber.java:124)
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS
ubscriber.java:81)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.unsafeSubscribe(Observable.java:7531)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.
java:251)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.
java:236)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.unsafeSubscribe(Observable.java:7531)
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer
ge.java:215)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
85)
at rx.internal.operators.**OperatorMerge**$MergeSubscriber.onNext(OperatorMerge.java:1
20)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.SingleDelayedProducer.emit(SingleDelayedProducer.java:80)
at rx.internal.operators.SingleDelayedProducer.set(SingleDelayedProducer.java:63)
at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservab
leList.java:93)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:44)
at rx.internal.operators.**OperatorFilter**$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OperatorTakeUntilPredicate$ParentSubscriber.onNext(Operat
orTakeUntilPredicate.java:54)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
at rx.internal.operators.**OperatorGroupBy**$GroupBySubscriber$2$2.onNext(OperatorGrou
pBy.java:286)
at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1
81)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB
y.java:340)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.
java:226)
at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124
)
at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis
h.java:560)
at rx.internal.operators.**OperatorPublish**$PublishSubscriber.onNext(OperatorPublish.
java:258)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscr
ibeFromIterable.java:98)
at rx.Subscriber.setProducer(Subscriber.java:177)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java
:50)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java
:33)
at rx.Observable.unsafeSubscribe(Observable.java:7531)
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer
ge.java:215)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
85)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
20)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124
)
at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis
h.java:560)
at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish.
java:258)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:112)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2$2.onNext(OperatorGrou
pBy.java:286)
at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1
81)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB
y.java:340)
at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.
java:226)
at rx.lang.scala.Subscriber$$anon$3.onNext(Subscriber.scala:198)
...
This problem arises essentially because the whole point of the Observable is to decouple a) code from b) when it is executed. But for debugging a program it's a nightmare. So repeating my question above I'd like to know whether it's possible to trace each composition to it's original line in source code.
Upvotes: 3
Views: 473
Reputation: 2408
A year on I still struggle with this, and have still not found out a good way to trace executions. I find that I rely on putting print statements in the code to see what's going on. That's the only way I can get traces of what is happening.
The only thing I've found helpful is to create a pattern for this, so I don't have to write doOnNext(x => println(x))
to show what's going on every single time:
implicit class ObservableTrace[T](o : rx.lang.scala.Observable[T]) {
import java.time.LocalTime
def trace(name : String) : rx.lang.scala.Observable[T] = {
def print(s: String) = println(s"${LocalTime.now} : $name : $s")
(o doOnNext (x => print("next:" + x))
doOnSubscribe print("subscribed")
doOnCompleted print("completed")
doOnError (e => print("error: " + e))
doOnUnsubscribe print("unsubscribed")
)
}
This makes editing code quick - just write myobservable.trace("My Observable")
on a couple of your observable, and this makes it easy to see when different lifecycle events occur.
Upvotes: 0
Reputation: 69997
There were some experiments regarding extra debug information but the whole library run 100x slower and was abandoned.
The problem is likely in a flatMap following your groupBy where you subscribe to the GroupedObservable as well as handing it back to the flatMap which now can't subscribe to it: GroupedObservable can be consumed only once. You need to use one of the publish()
or replay()
operators and adjust the function logic accordingly.
Upvotes: 1