Reputation: 3615
I have an observable which emits large number of data like
[1,1,1,2,2,2,3,3,1,1,5,5......]
In RxJava we can use distinctUntilChanged() to get a distinct items until it changed and it would result like
[1,2,3,1,5,......]
Similarly is there a way to buffer the items until changed? for example I expect an output like
[[1,1,1] , [2,2,2] , [3,3] , [1,1] , [5,5]......]
Upvotes: 3
Views: 1013
Reputation: 69997
You can share the source sequence, apply distinctUntilChanged
to one path which will then drive a buffer
operator that uses an Observable
to indicate boundaries:
@Test
@SuppressWarnings("unchecked")
public void test() {
Observable.fromArray(1,1,1,2,2,2,3,3,1,1,5,5)
.compose(bufferUntilChanged(v -> v))
.test()
.assertResult(
Arrays.asList(1, 1, 1),
Arrays.asList(2, 2, 2),
Arrays.asList(3, 3),
Arrays.asList(1, 1),
Arrays.asList(5, 5)
);
}
static final <T, K> ObservableTransformer<T, List<T>> bufferUntilChanged(
Function<T, K> keySelector) {
return o -> o.publish(q -> q.buffer(q.distinctUntilChanged(keySelector).skip(1)));
}
The skip(1)
is there because the very first item passing through distinctUntilChanged
would trigger a new buffer, having the very first buffer empty.
Upvotes: 7