Reputation: 460
When I run the following snippet I don't see the backpressure.
public static void main(String[] args) throws InterruptedException {
MyFileProcessor pro = new MyFileProcessor();
Timer t = new Timer();
t.start();
Disposable x = pro
.generateFlowable(
new File("path\\to\\file.raw"))
.subscribeOn(Schedulers.io(), false).observeOn(Schedulers.io()).map(y -> {
System.out.println(Thread.currentThread().getName() + " xxx");
return y;
})
.subscribe(onNext -> {
System.out.println(Thread.currentThread().getName() + " " + new String(onNext));
Thread.sleep(100);
}, Throwable::printStackTrace, () -> {
System.out.println("Done");
t.end();
System.out.println(t.getTotalTime());
});
Thread.sleep(1000000000);
}
When I run the class above I get an alternating lines of
RxCachedThreadScheduler-1 xxx
RxCachedThreadScheduler-1 Line1
....
Its using the same thread.
Now when I move the observeOn to just before the subscribe, I see a bunch of
RxCachedThreadScheduler-1 xxx
Followed by a bunch of
RxCachedThreadScheduler-1 Line1
I am assuming this is back pressure but still the thread used is the same.
Why am I seeing this behavior?
Why is only one thread being utilized?
There is no operator as such for the observeOn to operate on, so why am I seeing this behavior?
[edit]
public Flowable<byte[]> generateFlowable(File file) {
return Flowable.generate(() -> new BufferedInputStream(new FileInputStream(file)), (bufferedIs, output) -> {
try {
byte[] data = getMessageRawData(bufferedIs);
if (data != null)
output.onNext(data);
else
output.onComplete();
}
catch (Exception e) {
output.onError(e);
}
return bufferedIs;
}, bufferedIs -> {
try {
bufferedIs.close();
}
catch (IOException ex) {
RxJavaPlugins.onError(ex);
}
});
}
Upvotes: 0
Views: 37
Reputation: 69997
Why is only one thread being utilized?
Works correctly because you check the running thread after observeOn
and thus you are supposed to see the same thread there and below, no matter what happens above it. subscribeOn
affects generateFlowable
where, I suppose, you don't print the current thread and thus you don't see it runs on a different IO thread.
Now when I move the observeOn to just before the subscribe
There shouldn't be any difference unless something odd happens in generateFlowable
.
Upvotes: 1