Reputation: 163
In short, is there any solution to resolve backpressure in RxJava without resorting to dropping items, serializing operations or unbounded buffering?
Consider the following task as an example of when this could be useful.
The straightforward approach is to do all tasks sequentially on a single background thread, as in:
observeBlocksOfFileContents(file).
.subscribeOn(backgroundScheduler)
.map(compressBlock)
.subscribe(transmitBlock);
While this works without issue, from a performance perspective it is suboptimal as the runtime is the sum of all three operations as opposed to the maximum of them as it is when run in parallel:
observeBlocksOfFileContents(file).
.subscribeOn(diskScheduler)
.observeOn(cpuScheduler)
.map(compressBlock)
.observeOn(networkScheduler)
.subscribe(transmitBlock);
This can however fail due to backpressure if the data is read from disk faster than it can be compressed and transmitted. The usual backpressure solutions are undesirable for the following reasons:
Are there any other solutions? Or is this something that fundamentally does not fit the ReactiveX observable model?
Upvotes: 2
Views: 483
Reputation: 69997
6) Implement observeBlocksOfFileContents so it supports backpressure.
The filesystem is already pull-based (InputStream.read() happens when you want it to and not thrown at you) so think of a reasonable block size and read that in per request:
Observable.create(SyncOnSubscribe.createStateful(
() -> new FileInputStream("file.dat")
(in, out) -> {
byte[] buf = new byte[4096];
int r = in.read(buf);
if (r < 0) {
out.onCompleted();
} else {
if (r == buf.length) {
out.onNext(buf);
} else {
byte[] buf2 = new byte[r];
System.arraycopy(buf, 0, buf2, 0, r);
out.onNext(buf2);
}
}
},
in -> in.close()
));
(Try-catches omitted for brevity.)
Upvotes: 1