chhil
chhil

Reputation: 460

Placement of observeOn gives me different behavior

When I run the following snippet I don't see the backpressure.

public static void main(String[] args) throws InterruptedException {

    MyFileProcessor pro = new MyFileProcessor();
    Timer t = new Timer();
    t.start();
    Disposable x = pro
            .generateFlowable(
                    new File("path\\to\\file.raw"))
            .subscribeOn(Schedulers.io(), false).observeOn(Schedulers.io()).map(y -> {
                System.out.println(Thread.currentThread().getName() + " xxx");

                return y;
            })

            .subscribe(onNext -> {
                System.out.println(Thread.currentThread().getName() + " " + new String(onNext));
                Thread.sleep(100);
            }, Throwable::printStackTrace, () -> {
                System.out.println("Done");
                t.end();
                System.out.println(t.getTotalTime());
            });

    Thread.sleep(1000000000);

}  

When I run the class above I get an alternating lines of

RxCachedThreadScheduler-1 xxx
RxCachedThreadScheduler-1 Line1
....

Its using the same thread.

Now when I move the observeOn to just before the subscribe, I see a bunch of RxCachedThreadScheduler-1 xxx
Followed by a bunch of RxCachedThreadScheduler-1 Line1

I am assuming this is back pressure but still the thread used is the same.

Why am I seeing this behavior?
Why is only one thread being utilized?
There is no operator as such for the observeOn to operate on, so why am I seeing this behavior?

[edit]

public Flowable<byte[]> generateFlowable(File file) {
    return Flowable.generate(() -> new BufferedInputStream(new FileInputStream(file)), (bufferedIs, output) -> {
        try {
            byte[] data = getMessageRawData(bufferedIs);
            if (data != null)
                output.onNext(data);
            else
                output.onComplete();

        }
        catch (Exception e) {
            output.onError(e);
        }
        return bufferedIs;
    }, bufferedIs -> {
        try {
            bufferedIs.close();
        }
        catch (IOException ex) {
            RxJavaPlugins.onError(ex);
        }
    });

}  

Upvotes: 0

Views: 37

Answers (1)

akarnokd
akarnokd

Reputation: 69997

Why is only one thread being utilized?

Works correctly because you check the running thread after observeOn and thus you are supposed to see the same thread there and below, no matter what happens above it. subscribeOn affects generateFlowable where, I suppose, you don't print the current thread and thus you don't see it runs on a different IO thread.

Now when I move the observeOn to just before the subscribe

There shouldn't be any difference unless something odd happens in generateFlowable.

Upvotes: 1

Related Questions