J. Perkins
J. Perkins

Reputation: 4266

How to flush an Rx buffer onComplete?

I'm using RxJava, but might be able to translate an answer from another implementation.

I have an Observable emitting a sequence of items, and I want to batch them up into groups of 10. Something like this:

observable
    .buffer(10)
    .map(items -> {
        StringBuilder sb = new StringBuilder();
        for (String item : items)
            sb.append(item.document);
        return sb.toString();
    })
    .subscribe(...)

Works great. But I need to be able to flush incomplete buffers. Right now, if my observable completes after emitting only nine strings, my subscriber will never get called.

Is there a way to do this with the built-in operators, or do I have to create a custom observer to do the buffering while watching for onComplete?

Thanks!

Update: After further investigation, the problem turned out to be upstream, where I had some custom observables that worked fine under test but broke when wired together. I was essentially doing this:

Observable.never()
   .mergeWith(Observable.range(1,8))
   .buffer(5)

I haven't quite gotten my head around why, but that never() prevents partial batches from getting processed. This, however, works great:

Observable.empty()
   .mergeWith(Observable.range(1,8))
   .buffer(5)

I'll go study up. Thanks for the help!

Upvotes: 4

Views: 1869

Answers (1)

paul
paul

Reputation: 13471

I dont know why you´re not emitting the last items from the buffer. Buffer will emit group of items but it will emit all of them.

Check this example

@Test
public void stringBuffer() {
    Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    Observable.from(numbers)
            .map(number -> "uniqueKey=" + number )
            .buffer(5)
            .map(ns -> String.join("&", ns))
            .subscribe(System.out::println);

}

Even if the last number 10 is the only element in the group, it´s emitted.

You can see more examples of buffer here https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableBuffer.java

Also take a look to Window operator, maybe works better in your use case

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableWindow.java

Upvotes: 1

Related Questions