J. Perkins
J. Perkins

Reputation: 4256

RxJava bufferWithTimeOrCount() implementation?

Using RxJava, I need to buffer up a stream of items into groups of 3, but if more than 500ms elapse between incoming items, flush the buffer.

The bufferWithTimeOrCount() operator is exactly what I'm looking for, but it only seems to be implemented for RxJS and Rx.NET, and I need to use RxJava for this one.

Is there a way to duplicate the behavior of bufferWithTimeOrCount() and get what I'm after with the existing RxJava 1.x operators?

buffer(500, TimeUnit.MILLISECONDS, 3) tries to emit a new list every 500ms, not taking into account the time since the last item.

I've been trying to make things work with buffer(bufferClosingSelector) (see RxJava Buffer). I set up a sequence that emits items with an increasing delay between them:

Observable<Long> emitter = Observable
    .range(1, 9)
    .flatMap(n -> {
        return Observable.just(n).delay(n * n * 50, TimeUnit.MILLISECONDS); 
    })
    .timeInterval()
    .map(interval -> interval.getIntervalInMilliseconds());

Then I tried using debounce() to flush the buffer when the time between items grew over 500ms:

emitter
    .buffer(emitter.debounce(500, TimeUnit.MILLISECONDS))
    .toBlocking()
    .subscribe(i -> System.out.println(i));

Which seems to work, producing sequences like:

[51, 150, 250, 352, 449]
[551]
[650]
[749]
[850]

Then I tried to create a counter to flush the buffer after every three items, thinking that I could just merge() it with the debounced observable:

emitter
    .buffer(emitter
       .scan(1L, (n, x) -> n + 1)   // count the items up from 1
       .filter(n -> ((n % 3) == 0)  // emit every three items
    )
    .toBlocking()
    .subscribe(i -> System.out.println(i));

But the results weren't as successful:

[50]
[152, 248, 350]
[452, 550, 650, 749]
[]

If I output the intervals, it appears the counter is running asynchronously to the batching observable, so a race condition is occurring between the time it fires and the time the batch actually gets released:

emitter
    .buffer(emitter
       .doOnNext(i -> System.out.println("i = " + i))
       .scan(1L, (n, x) -> n + 1)   // count the items up from 1
       .filter(n -> ((n % 3) == 0)  // emit every three items
    )
    .toBlocking()
    .subscribe(i -> System.out.println(i));

Produces:

i = 63
i = 150
[51, 150]
i = 249
i = 350
i = 451
[249, 351]
i = 550
i = 650
i = 750
[450, 550, 650, 750]
i = 850
[850]

...and now I'm kind of stumped.

That was long, but TL;DR: Is there a way to duplicate the behavior of bufferWithTimeOrCount() and get what I'm after with the existing RxJava 1.x operators? Thanks!

Upvotes: 2

Views: 523

Answers (1)

Kiskae
Kiskae

Reputation: 25573

EDIT: after looking at the problem again I realized this solution would always emit for every 3rd item regardless if it already emitted due to a debounce. I cannot immediately think of a way to 'reset' the counting buffer after a debounce.

The problem is that by using emitter twice you create two independent streams with non-deterministic behavior since the scheduler is involved.

The key is combining the buffer(...), debounce(...) and publish(...) operators:

emitter.publish(source -> {
    // Create the buffer here using a shared source
    return source.buffer(Observable.defer(() ->
         // Merge these reasons for closing the buffer
         Observable.merge(
             // Either after 500 ms
             source.debounce(500, TimeUnit.MILLISECONDS),
             // or every 3 items
             source.buffer(3)
         )
    ))
})

I'm not able to test this solution, but the comments should explain the idea behind it. The Observable.defer ensures the observables responsible for closing the windows are created after the original source.

Upvotes: 0

Related Questions