Reputation: 171
I'm working on something with RxJava (custom caching observable) and I found following problem.
Example is straightforward simple: Creates observable, subscribers and immediately unsubscribes.
public static void main(String[] args) {
Observable<Long> origin = Observable.create(new Observable.OnSubscribe<Long>() {
@Override public void call(Subscriber<? super Long> subscriber) {
try {
System.out.println("Starting... > " + Thread.currentThread().getName());
Thread.sleep(3000);
subscriber.onNext(System.currentTimeMillis());
Thread.sleep(200);
subscriber.onCompleted();
System.out.println("Complete...");
} catch(Exception e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
Subscription sub1 = origin.subscribeOn(Schedulers.newThread()).subscribe(new Observer<Long>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { e.printStackTrace(); }
@Override public void onNext(Long value) {
System.out.println("onNext() > " + value);
}
});
try {
sub1.unsubscribe();
} catch (Exception e) {
e.printStackTrace();
}
try {
System.in.read(); // wait for user input to finish
} catch (IOException e) {
e.printStackTrace();
}
}
If you run following program several times you should get following exception (try until you get exception)
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at org.skendel.Program$1.call(Program.java:23)
at org.skendel.Program$1.call(Program.java:19)
at rx.Observable.unsafeSubscribe(Observable.java:7710)
at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at org.skendel.Program$1.call(Program.java:23)
at org.skendel.Program$1.call(Program.java:19)
at rx.Observable.unsafeSubscribe(Observable.java:7710)
at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
My question is: why is this exception (sometimes) happening? It happens even .cache() observable is created.
Exception is tied to the first Thread.sleep() call in the implementation of Observable.OnSubscribe()
Upvotes: 1
Views: 2870
Reputation: 69997
Spinning up a new thread may take some time (even 1ms) which gives the main thread plenty of time to unsubscribe thus cancel the task scheduled by subscribeOn
. Other times, the thread spins up so fast that it starts running the OnSubscribe
code which is now interrupted by the unsubscribe call.
As others mentioned, you should try to avoid sleeps in your code but instead, setup the sequence with delay
s or simply use a Scheduler.Worker.schedule()
to trigger actions after some time.
Upvotes: 1
Reputation: 2126
My assumption would be the exception is NOT happening only if main thread reaches the sub1.unsubscribe();
line before Observable goes to sleep, because it is the unsubscribe
call that wakes the Observable the hard way - as it deletes the thread origin
is subscribed to.
This exception seems expected, so handle it separately, not in onError() method.
Upvotes: 0
Reputation: 6463
You are trying to unsubscribe an event which is currently running (Thread.sleep()
). Your library apparently detects this and tries to wake your event to stop it. You should not rely on Sleep()
in your code, anyway. There is always a better, more proper way. Plus, as a thumb of the rule, an event should always be as fast as possible.
The Exception won't occur if you unsubscribe fast enough: your event didn't fire, it's not sleeping, so there's no exception.
Try to delay the unsubscribe a second, and it should fire all the time, and create an Exception all the time too.
Upvotes: 3