Manu
Manu

Reputation: 3615

RxJava buffer until changed

I have an observable which emits large number of data like

[1,1,1,2,2,2,3,3,1,1,5,5......]

In RxJava we can use distinctUntilChanged() to get a distinct items until it changed and it would result like

[1,2,3,1,5,......]

Similarly is there a way to buffer the items until changed? for example I expect an output like

[[1,1,1] , [2,2,2] , [3,3] , [1,1] , [5,5]......]

Upvotes: 3

Views: 1013

Answers (1)

akarnokd
akarnokd

Reputation: 69997

You can share the source sequence, apply distinctUntilChanged to one path which will then drive a buffer operator that uses an Observable to indicate boundaries:

@Test
@SuppressWarnings("unchecked")
public void test() {
    Observable.fromArray(1,1,1,2,2,2,3,3,1,1,5,5)
    .compose(bufferUntilChanged(v -> v))
    .test()
    .assertResult(
            Arrays.asList(1, 1, 1),
            Arrays.asList(2, 2, 2),
            Arrays.asList(3, 3),
            Arrays.asList(1, 1),
            Arrays.asList(5, 5)
        );
}

static final <T, K> ObservableTransformer<T, List<T>> bufferUntilChanged(
        Function<T, K> keySelector) {
    return o -> o.publish(q -> q.buffer(q.distinctUntilChanged(keySelector).skip(1)));
}

The skip(1) is there because the very first item passing through distinctUntilChanged would trigger a new buffer, having the very first buffer empty.

Upvotes: 7

Related Questions