Sinisa Kendel
Sinisa Kendel

Reputation: 171

RxJava Thread.sleep() interrupted exception

I'm working on something with RxJava (custom caching observable) and I found following problem.

Example is straightforward simple: Creates observable, subscribers and immediately unsubscribes.

public static void main(String[] args) {
Observable<Long> origin = Observable.create(new Observable.OnSubscribe<Long>() {
    @Override public void call(Subscriber<? super Long> subscriber) {
        try {
            System.out.println("Starting... > " + Thread.currentThread().getName());
            Thread.sleep(3000);
            subscriber.onNext(System.currentTimeMillis());
            Thread.sleep(200);
            subscriber.onCompleted();
            System.out.println("Complete...");
        } catch(Exception e) {
            e.printStackTrace();
            subscriber.onError(e);
        }
    }
});

Subscription sub1 = origin.subscribeOn(Schedulers.newThread()).subscribe(new Observer<Long>() {
    @Override public void onCompleted() { }
    @Override public void onError(Throwable e) { e.printStackTrace();  }
    @Override public void onNext(Long value) {
        System.out.println("onNext() > " + value);
    }
});

try {
    sub1.unsubscribe();
} catch (Exception e) {
    e.printStackTrace();
}

try {
    System.in.read(); // wait for user input to finish
} catch (IOException e) {
    e.printStackTrace();
}

}

If you run following program several times you should get following exception (try until you get exception)

java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at org.skendel.Program$1.call(Program.java:23)
    at org.skendel.Program$1.call(Program.java:19)
    at rx.Observable.unsafeSubscribe(Observable.java:7710)
    at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at org.skendel.Program$1.call(Program.java:23)
    at org.skendel.Program$1.call(Program.java:19)
    at rx.Observable.unsafeSubscribe(Observable.java:7710)
    at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

My question is: why is this exception (sometimes) happening? It happens even .cache() observable is created.

Exception is tied to the first Thread.sleep() call in the implementation of Observable.OnSubscribe()

Upvotes: 1

Views: 2870

Answers (3)

akarnokd
akarnokd

Reputation: 69997

Spinning up a new thread may take some time (even 1ms) which gives the main thread plenty of time to unsubscribe thus cancel the task scheduled by subscribeOn. Other times, the thread spins up so fast that it starts running the OnSubscribe code which is now interrupted by the unsubscribe call.

As others mentioned, you should try to avoid sleeps in your code but instead, setup the sequence with delays or simply use a Scheduler.Worker.schedule() to trigger actions after some time.

Upvotes: 1

JockX
JockX

Reputation: 2126

My assumption would be the exception is NOT happening only if main thread reaches the sub1.unsubscribe(); line before Observable goes to sleep, because it is the unsubscribe call that wakes the Observable the hard way - as it deletes the thread origin is subscribed to.

This exception seems expected, so handle it separately, not in onError() method.

Upvotes: 0

Guillaume F.
Guillaume F.

Reputation: 6463

You are trying to unsubscribe an event which is currently running (Thread.sleep()). Your library apparently detects this and tries to wake your event to stop it. You should not rely on Sleep() in your code, anyway. There is always a better, more proper way. Plus, as a thumb of the rule, an event should always be as fast as possible.

The Exception won't occur if you unsubscribe fast enough: your event didn't fire, it's not sleeping, so there's no exception.

Try to delay the unsubscribe a second, and it should fire all the time, and create an Exception all the time too.

Upvotes: 3

Related Questions