Reputation: 1899
I'm trying to optimize web service calls with RxJava by executing batch requests when appropriate but without introducing too much delay to the response.
For that I'm using buffer(closingSelector)
operator with debounce()
as closing selector like this:
Observable<BaseCall<T, R>> burstyMulticast = requestStream.share();
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast.debounce(windowSize, windowUnit);
burstyMulticast.buffer(burstyDeBounced).subscribe(/* call external WS with batches */);
It works fine, except that if requestStream
produces too fast it emits huge batches, too big for the WS to handle at once, so I'd like to limit the batch size somehow.
So I need a closingSelector
that emits a close event either if there are X items in the buffer or Y amount of time passed since the last item arrived from the upstream.
I can't seem to find a good solution other than implementing a custom Operator
which is similar to OperatorDebounceWithTime
but with an internal buffer which returns all elements in the buffer rather than the last one.
Is there an easier way to achieve this e.g. by combining some ops?
Edit:
After posting the question I realised that the code piece above has another problem: if requests flowing continuously faster than debounce timeout (requestStream
is producing faster than windowSize
) then burstyDeBounced
won't emit anything so all requests will be buffered until there is a long enough pause in the incoming stream.
Upvotes: 1
Views: 726
Reputation: 1899
I ended up implementing a custom op: https://gist.github.com/zsoltm/79462b37c0943b4fbef2ee3968155f27 it seems to work well. I'm happy to take suggestions for improvements.
Upvotes: 0
Reputation: 69997
You could split the large buffers of the debounced source into smaller ones:
Observable<BaseCall<T, R>> burstyMulticast = requestStream.share();
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast
.debounce(windowSize, windowUnit);
burstyMulticast.buffer(burstyDeBounced)
.onBackpressureBuffer()
.concatMapIterable(list -> Lists.partition(list, windowSizeLimit))
.subscribe(...);
where Lists.partition is from Google Guava.
Upvotes: 0