Z4-
Z4-

Reputation: 1899

Rx Buffering a Fast Producing Stream with Limit + Timeout

I'm trying to optimize web service calls with RxJava by executing batch requests when appropriate but without introducing too much delay to the response. For that I'm using buffer(closingSelector) operator with debounce() as closing selector like this:

Observable<BaseCall<T, R>> burstyMulticast = requestStream.share();
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast.debounce(windowSize, windowUnit);
burstyMulticast.buffer(burstyDeBounced).subscribe(/* call external WS with batches */);

It works fine, except that if requestStream produces too fast it emits huge batches, too big for the WS to handle at once, so I'd like to limit the batch size somehow. So I need a closingSelector that emits a close event either if there are X items in the buffer or Y amount of time passed since the last item arrived from the upstream.

I can't seem to find a good solution other than implementing a custom Operator which is similar to OperatorDebounceWithTime but with an internal buffer which returns all elements in the buffer rather than the last one.

Is there an easier way to achieve this e.g. by combining some ops?

Edit:

After posting the question I realised that the code piece above has another problem: if requests flowing continuously faster than debounce timeout (requestStream is producing faster than windowSize) then burstyDeBounced won't emit anything so all requests will be buffered until there is a long enough pause in the incoming stream.

Upvotes: 1

Views: 726

Answers (2)

Z4-
Z4-

Reputation: 1899

I ended up implementing a custom op: https://gist.github.com/zsoltm/79462b37c0943b4fbef2ee3968155f27 it seems to work well. I'm happy to take suggestions for improvements.

Upvotes: 0

akarnokd
akarnokd

Reputation: 69997

You could split the large buffers of the debounced source into smaller ones:

Observable<BaseCall<T, R>> burstyMulticast = requestStream.share();
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast
    .debounce(windowSize, windowUnit);

burstyMulticast.buffer(burstyDeBounced)
.onBackpressureBuffer()
.concatMapIterable(list -> Lists.partition(list, windowSizeLimit))
.subscribe(...);

where Lists.partition is from Google Guava.

Upvotes: 0

Related Questions