dripto
dripto

Reputation: 619

RxJava: Using SubscribeOn makes the program exit without completing

This is my sample code

Observable.range(1,5)
            .subscribeOn(Schedulers.computation())
            .map(Observables05::doSomething)
            .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("done"));

My doSomething Method is,

public static int doSomething(int i) {
    try {
        System.out.println("Processing " + i +
                " on Thread -- " + Thread.currentThread().getName());
        Thread.sleep(500);
        return i;
    } catch (InterruptedException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    }
}

Using only the sample code in my main thread simply exits the program. However, if Thread.sleep(3000) is used after this then program works correctly before exiting when the sleep time ends.

Is this the expected behaviour and why? How can I made this code run without using Thread.sleep?

Upvotes: 2

Views: 1043

Answers (3)

Pavan Kumar
Pavan Kumar

Reputation: 4820

That's the expected behavior. Simple solution for you as you're executing in main method is to use toBlocking() before you subscribe - as below -

    Observable.range(1,5)
    .subscribeOn(Schedulers.computation())
    .map(DummyJunk::doSomething)
    .toBlocking()
    .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("done"));

Upvotes: 1

MatBos
MatBos

Reputation: 2390

If you want to test the code that you have written make use of the TestSubscriber.

Code would look like this:

TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();

Observable.range(1,5)
        .subscribeOn(Schedulers.computation())
        .map(Observables05::doSomething)
        .subscribe(testSubscriber);

testSubscriber.awaitTerminalEvent(10, TimeUnit.SECONDS);

Keep in mind that TestSubscriber allows you to test a lot more than only terminal event.

Upvotes: 0

Yaroslav Stavnichiy
Yaroslav Stavnichiy

Reputation: 21446

subscribeOn schedules subscribe call to background thread (you have selected computation scheduler). After this scheduling your main thread is free to go, i.e. terminate your program.

Somehow you need to wait for all desirable tasks to complete before exiting. Thread.sleep(3000) does the job for simple test cases.

Real programs usually do not terminate that fast. Still there are cases when you need to wait for some background task to complete. There are various thread synchronization mechanisms (eg. CountDownLatch) that you can use for that.

Upvotes: 3

Related Questions