Reputation: 4266
I'm using RxJava, but might be able to translate an answer from another implementation.
I have an Observable emitting a sequence of items, and I want to batch them up into groups of 10. Something like this:
observable
.buffer(10)
.map(items -> {
StringBuilder sb = new StringBuilder();
for (String item : items)
sb.append(item.document);
return sb.toString();
})
.subscribe(...)
Works great. But I need to be able to flush incomplete buffers. Right now, if my observable completes after emitting only nine strings, my subscriber will never get called.
Is there a way to do this with the built-in operators, or do I have to create a custom observer to do the buffering while watching for onComplete
?
Thanks!
Update: After further investigation, the problem turned out to be upstream, where I had some custom observables that worked fine under test but broke when wired together. I was essentially doing this:
Observable.never()
.mergeWith(Observable.range(1,8))
.buffer(5)
I haven't quite gotten my head around why, but that never()
prevents partial batches from getting processed. This, however, works great:
Observable.empty()
.mergeWith(Observable.range(1,8))
.buffer(5)
I'll go study up. Thanks for the help!
Upvotes: 4
Views: 1869
Reputation: 13471
I dont know why you´re not emitting the last items from the buffer. Buffer will emit group of items but it will emit all of them.
Check this example
@Test
public void stringBuffer() {
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
Observable.from(numbers)
.map(number -> "uniqueKey=" + number )
.buffer(5)
.map(ns -> String.join("&", ns))
.subscribe(System.out::println);
}
Even if the last number 10 is the only element in the group, it´s emitted.
You can see more examples of buffer here https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableBuffer.java
Also take a look to Window
operator, maybe works better in your use case
Upvotes: 1