Edward Peek
Edward Peek

Reputation: 163

Handling backpressure without dropping items or serializing in RxJava

In short, is there any solution to resolve backpressure in RxJava without resorting to dropping items, serializing operations or unbounded buffering?

Consider the following task as an example of when this could be useful.

  1. Reading data from disk into memory
  2. Compressing the data
  3. Transmitting the compressed data over the network

The straightforward approach is to do all tasks sequentially on a single background thread, as in:

observeBlocksOfFileContents(file).
    .subscribeOn(backgroundScheduler)
    .map(compressBlock)
    .subscribe(transmitBlock);

While this works without issue, from a performance perspective it is suboptimal as the runtime is the sum of all three operations as opposed to the maximum of them as it is when run in parallel:

observeBlocksOfFileContents(file).
    .subscribeOn(diskScheduler)
    .observeOn(cpuScheduler)
    .map(compressBlock)
    .observeOn(networkScheduler)
    .subscribe(transmitBlock);

This can however fail due to backpressure if the data is read from disk faster than it can be compressed and transmitted. The usual backpressure solutions are undesirable for the following reasons:

  1. Drop items: the file must be transmitted in full without missing pieces
  2. Serialize on single thread: the performance improvement of pipelining is lost
  3. Callstack blocking: not supported in RxJava
  4. Increase observeOn buffers: memory consumption may become several times the file size
  5. Reimplement observeOn without MissingBackpressureException: a lot of work and breaks fluent API

Are there any other solutions? Or is this something that fundamentally does not fit the ReactiveX observable model?

Upvotes: 2

Views: 483

Answers (1)

akarnokd
akarnokd

Reputation: 69997

6) Implement observeBlocksOfFileContents so it supports backpressure.

The filesystem is already pull-based (InputStream.read() happens when you want it to and not thrown at you) so think of a reasonable block size and read that in per request:

Observable.create(SyncOnSubscribe.createStateful(
    () -> new FileInputStream("file.dat")
    (in, out) -> {
        byte[] buf = new byte[4096];
        int r = in.read(buf);
        if (r < 0) {
            out.onCompleted();
        } else {
            if (r == buf.length) {
                out.onNext(buf);
            } else {
                byte[] buf2 = new byte[r];
                System.arraycopy(buf, 0, buf2, 0, r);
                out.onNext(buf2);
            }
        }

    }, 
    in -> in.close()
));

(Try-catches omitted for brevity.)

Upvotes: 1

Related Questions